#!/usr/bin/python # file: mail-twisted.py # Sending, retrieving, deleting emails, and doing Telnet with the Twisted framework. import sys, string, os import email from twisted.internet import reactor from twisted.internet.defer import DeferredList host = "localhost" user = "kendrew" passwd = "kendrew" addrs = ( "aa@localhost", "bb@localhost", "cc@localhost" ) def errorHandler(error): reactor.stop() raise error def main(): if len(sys.argv) < 2 or "h" == sys.argv[1]: usage() elif "s" == sys.argv[1]: print len(addrs), "messages to be sent." dlist = [] for addr in addrs: toaddr = user + "@" + host text = "Test mail: " + addr + " to " + toaddr dlist.append( sendMail(host, addr, toaddr, text, text) ) DeferredList(dlist).addBoth(lambda _: reactor.stop()) reactor.run() elif "v" == sys.argv[1]: display(host, user, passwd) elif "d" == sys.argv[1]: display(host, user, passwd, 1) elif "1" == sys.argv[1]: start() elif "0" == sys.argv[1]: stop(host) else: print "Invalid command: " + sys.argv[1] def usage(): print "Usage: mail.py [(0)stopserver|(1)startserver|(s)end|(v)iew|(d)delete]" sys.exit() def display(host, user, passwd, deletion = 0): from twisted.internet.protocol import ClientCreator from twisted.mail.pop3client import POP3Client class MyPOP3Client(POP3Client): def __init__(self): self.myuser = user self.mypass = passwd self.deletion = deletion self.allowInsecureLogin = True def serverGreeting(self, msg): POP3Client.serverGreeting(self, msg) self.login(self.myuser, self.mypass).addCallbacks( self.do_stat, errorHandler) def do_stat(self, result): self.stat().addCallbacks(self.do_retrieve, errorHandler) def do_retrieve(self, stats): self.format = "%-3s %-15s %s" self.num_messages = stats[0] self.cur_message = 0 print self.myuser, "has", self.num_messages, "messages" if self.num_messages > 0: if deletion: print "Deleting", self.num_messages, "messages", self.delete(0).addCallbacks(self.do_delete_msg, errorHandler) else: print self.format % ("Num", "From", "Subject") self.retrieve(0).addCallbacks(self.do_retrieve_msg, errorHandler) else: reactor.stop() def do_retrieve_msg(self, lines): msg = email.message_from_string("\r\n".join(lines)) print self.format % (self.cur_message, msg["From"], msg["Subject"]) self.cur_message += 1 if (self.cur_message < self.num_messages): self.retrieve(self.cur_message).addCallbacks( self.do_retrieve_msg, errorHandler) else: reactor.stop() def do_delete_msg(self, str): print ".", self.cur_message += 1 if (self.cur_message < self.num_messages): self.delete(self.cur_message).addCallbacks( self.do_delete_msg, errorHandler) else: print " done." q = self.quit() q.addCallbacks(lambda _: reactor.stop(), errorHandler) pop3 = ClientCreator(reactor, MyPOP3Client) d = pop3.connectTCP(host, 110) reactor.run() def sendMail(host, addr, to, subject, content): from twisted.mail.smtp import sendmail from email.MIMEText import MIMEText print "Sending mail from", addr, "to", to, "..." msg = MIMEText(content) msg["Subject"] = subject msg["From"] = addr msg["To"] = to return sendmail(host, addr, [to], msg.as_string()) def start(): print "Starting server...", os.system("c:/apps/bin/james.bat") print "done." def stop(host): from twisted.internet.protocol import ClientCreator from twisted.conch.telnet import Telnet class MyTelnet(Telnet): def dataReceived(self, data): if "Login id:" in data: self._write("root\n") elif "Password:" in data: self._write("root\n") elif "Welcome" in data: d = self._write("shutdown\n") def connectionLost(self, reason): reactor.stop() print "done." mytelnet = ClientCreator(reactor, MyTelnet) d = mytelnet.connectTCP(host, 4555) reactor.run() main()