massmailer
massmailer
sends a similar (multipart mime) e-mail to a bunch of people. The list of recipients is not disclosed: each e-mail is addressed individually. massmailer
has been developped and successfully used for the organization of a scientific conference.
#!/usr/bin/env python # Licensed under the Python License (see http://www.python.org/psf/license/) # Copyright (C) 2008 Pierre Duquesne <stackp@online.fr> USAGE='''\ usage: massmailer --server <hostname> --addrlist <addr.txt> --from <addr> [--subject <subject>] [--log <logfile>] [--text <textfile>] [--img <image>] [--audio <sndfile>] [--bin <file>] massmailer sends a similar (multipart mime) mail to a bunch of people. --server <hostname> the name of the smtp server --addrlist <addr.txt> is a text file containing one address per line. Each address can be a raw email address, or in the format: Name of the person <person@site.org> --from <addr> is the sender address either formatted as jdoe@site.com or "John Doe <jdoe@site.com>" --subject <subject> the subject of the mail --log <logfile> a file name for the program to write success and failures in a human-readable text file. --text <txtfile> --img <image> --audio <sndfile> --bin <file> attach a text file, a picture, an audio file, or any type of file to the mail. Files specified with --text don\'t have their filename specified in the mime headers (the text only is inserted). Use --bin if you want to attach the filename information. Example: massmailer --server smtp.site.com --addrlist addr.txt --log log \\ --from spammer@site.com --subject "Cheap Rolex" \\ --text buy.txt --img rolex.png\ ''' import smtplib, sys, getopt from email.MIMEBase import MIMEBase from email.MIMEMultipart import MIMEMultipart from email.MIMEText import MIMEText from email.MIMEImage import MIMEImage from email.MIMEAudio import MIMEAudio import email, mimetypes from os.path import basename # -- Parse command line arguments opts, args = getopt.getopt(sys.argv[1:], ':h',\ ['server=','log=','addrlist=','from=','subject=', 'text=', 'img=', 'audio=', 'bin=', 'help']) if args != []: print "Wrong command line arguments:", "'"+' '.join(args)+"'" print "Try '%s --help' for valid options." % sys.argv[0] sys.exit(1) servername=None logfile=None addrfile=None fromaddr=None subject='' attch=[] for o,a in opts: if o=='--text' or o=='--img' or o=='--audio' or o=='--bin': attch.append((o[2:],a)) elif o=='--server': servername=a elif o=='--log': logfile=a elif o=='--addrlist': addrfile=a elif o=='--from': fromaddr=a elif o=='--subject': subject=a elif o in ['-h', '--help']: print USAGE sys.exit(0) # -- Print error messages if not (servername and addrfile and fromaddr): if not servername: print 'Error: No server specified (--server option)' if not addrfile: print 'Error: No address list specified (--addrlist option)' if not fromaddr: print 'Error: No sender address specified (--from option)' sys.exit(1) # -- Print warning messages warning='' if not subject: warning = warning + 'Warning: No subject specified (--subject option)\n' if not logfile: warning = warning +'Warning: No log file specified (--logfile option)\n' if attch == []: warning = warning + \ 'Warning: No content specified (--text, --img, --audio, --bin options)\n' if warning != '': print '\n', warning inp = raw_input('Are you sure you want to continue [Y/n] ?') if inp not in ['Y','y', '']: print 'Aborted, bye!' sys.exit(0) # -- Store the destination addresses in a list addrlist = [l.strip() for l in open(addrfile).readlines() if l.strip() !=''] if logfile: print 'writing log to',logfile log = open(logfile, 'w') else: log = sys.stdout # -- Build multi-part mime message msg = MIMEMultipart() msg['Subject']=subject msg['From']=fromaddr msg['To']='' for (ftype, f) in attch: fd = open(f) content = fd.read() if ftype == 'text': msg.attach(MIMEText(content)) elif ftype == 'img': m = MIMEImage(content) m.add_header('Content-Disposition','inline', filename=basename(f)) msg.attach(m) # FIXME: mp3 subtype not found elif ftype == 'audio': msg.attach(MIMEAudio(content)) elif ftype == 'bin': m = MIMEBase('application', 'octet-stream') m.add_header('Content-Disposition','attachment', filename=basename(f)) m.set_payload(content) email.Encoders.encode_base64(m) msg.attach(m) fd.close() # -- Send the message try: server = smtplib.SMTP(servername) except Exception, e: print >>log, 'Could not connect to', servername, e.__class__, e sys.exit(2) total = len(addrlist) for (idx, addr) in enumerate(addrlist): try: msg.replace_header('To', addr) server.sendmail(fromaddr, addr, msg.as_string()) print >>log, 'Success: ', addr print 'Progress: %d/%d' % (idx+1, total) except Exception, e: print >>log, 'Error:',addr, e.__class__, e server.quit()
imapsave
imapsave
simply copies the content of an imap account into mbox text files (one file per folder). It does not do incremental backups.
I wrote imapsave
when I realized imapbackup could not use SSL encryption.
#!/usr/bin/env python # -*- coding: iso-8859-1 -*- # Licensed under the Python License (see http://www.python.org/psf/license/) # Copyright (C) 2008 Pierre Duquesne <stackp@online.fr> # # Changelog: # 20080722 * disable spinning when output is redirected to a file # 20080718 * Use BODY.PEEK[] in the fetch command as specified in RFC 3501 # (fix by Stefan) # 20080313 * Initial release import getpass import imaplib import email import email.Parser import re import sys import getopt from os.path import basename USAGE = """\ usage: %s [--ssl] [--port num] [--password pass] [server] [user] server The hostname of your imap server user Your login --ssl Use an encrypted connection -p num --port num Use port number num instead of default port (Defaults are 143 for unencrypted connection and 993 for ssl) -P pass --password pass Password for the server """ % basename(sys.argv[0]) class Spinner: """A class to show some activity on a character terminal.""" def __init__(self, message=''): self.message = message self.symbols = list('-\|/') self.nsym = len(self.symbols) self.n = 0 def spin(self): print '\r', self.message + self.symbols[self.n], sys.stdout.flush() self.n = (self.n + 1) % self.nsym def stop(self, stop_message=''): print '\r', self.message + stop_message def set_message(self, message): self.message = message class DummySpinner: """A class that mock Spinner and do not spin. Used when output is not a character terminal (e.g. a log file). """ def __init__(self, message=''): self.message = message def spin(self): pass def stop(self, stop_message=''): print self.message + stop_message def set_message(self, message): self.message = message def foldname_to_filename(folder): """Deal with weird characters in folder names.""" filename = re.sub('"(.*)"', ur'\1', folder) filename = filename.replace('/', '.') return filename try: # Parse command line arguments try: opts, args = getopt.gnu_getopt(sys.argv[1:], 'pP:h', ['port=', 'password=', 'ssl', 'help']) except getopt.GetoptError, e: print 'Error:', e sys.exit(1) ssl = False port = None password = None for o, a in opts: if o == '--ssl': ssl = True if o == '-p' or o == '--port': port = int(a) if o == '-P' or o == '--password': password = a elif o in ['-h', '--help']: print USAGE sys.exit(0) try: servername = args[0] except IndexError: servername = raw_input("Enter the server hostname: ") try: username = args[1] except IndexError: username = raw_input("Enter your username: ") # Connect print 'Connecting to %s as user %s ...' % (servername, username) if ssl: IMAP = imaplib.IMAP4_SSL else: IMAP = imaplib.IMAP4 try: if port: server = IMAP(servername, port) else: server = IMAP(servername) if not password: password = getpass.getpass() server.login(username, password) except Exception, e: print 'Error:', e sys.exit(1) # Retrieve folder list folders = [] foldptn = re.compile('\([^\)]*\) "[^"]*" ([^"]*)') for fold_desc in server.list()[1]: folder = foldptn.sub(ur'\1', fold_desc) folders.append(folder) # Save messages in a separate file for each folder if sys.stdout.isatty(): spinner = Spinner() else: spinner = DummySpinner() parser = email.Parser.Parser() succeed = [] failed = [] for folder in folders: msg = '%s ... ' % folder spinner.set_message(msg) try: resp, info = server.select(folder) if resp != 'OK': raise imaplib.IMAP4.error(' - '.join(info)) filename = foldname_to_filename(folder) fp = open(filename, 'w') resp, items = server.search(None, "ALL") numbers = items[0].split() for num in numbers: resp, data = server.fetch(num, "(BODY.PEEK[])") text = data[0][1] mess = parser.parsestr(text) fp.write(mess.as_string(unixfrom=True)) spinner.spin() succeed.append(folder) spinner.stop('Done.') fp.close() except imaplib.IMAP4.error, e: failed.append(folder) spinner.stop('Error! (' + str(e) + ')') server.logout() # Print the folders which failed, if any if failed != []: import textwrap wrapper = textwrap.TextWrapper(initial_indent=' ', subsequent_indent=' ') print print 'WARNING - The following folders could not be saved:' print wrapper.fill(', '.join(failed)) sys.exit(1) except KeyboardInterrupt: print '' print '^C received, stopping.'
Spinner: display activity
This was inspired by some code found in imapbackup. Sometimes you need to display activity on the text console to inform the user that the program is actually doing something.
That is the purpose of this small python class. The test()
function shows the usage. Here’s what test()
displays:
# Licensed under the Python License (see http://www.python.org/psf/license/) # Copyright (C) 2008 Pierre Duquesne <stackp@online.fr> import sys class Spinner: """A class to show a spinning ascii animation on a character terminal. It informs the user that some processing is being done. """ def __init__(self, message=''): self.message = message self.symbols = list('-\|/') self.nsym = len(self.symbols) self.n = 0 def spin(self): print '\r', self.message + self.symbols[self.n], sys.stdout.flush() self.n = (self.n + 1) % self.nsym def stop(self, stop_message=''): print '\r', self.message + stop_message def set_message(self, message): self.message = message class DummySpinner: """A class that mock Spinner and do not spin. To be used when output is not a character terminal (e.g. a log file). For example:: if sys.stdout.isatty(): s = Spinner() else: s = DummySpinner() """ def __init__(self, message=''): self.message = message def spin(self): pass def stop(self, stop_message=''): print self.message + stop_message def set_message(self, message): self.message = message if __name__ == "__main__": import time def test(): if sys.stdout.isatty(): s = Spinner() else: s = DummySpinner() # simple usage for i in range(10): s.spin() time.sleep(0.1) s.set_message('Initialization ... ') for i in range(10): s.spin() time.sleep(0.1) s.stop('Done') s.set_message('Loading ... ') for i in range(10): s.spin() time.sleep(0.1) s.stop('Done') test()
A better minimal web server in python
The simple web server from the python standard library is easy to improve to:
- answer several requests at the same time, and
- cancel a connection when the client stops responding.
#!/usr/bin/env python import SimpleHTTPServer, BaseHTTPServer, SocketServer, socket class ThreadedHTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer) : """ New features w/r to BaseHTTPServer.HTTPServer: - serves multiple requests simultaneously - catches socket.timeout and socket.error exceptions (raised from RequestHandler) """ def __init__(self, *args): BaseHTTPServer.HTTPServer.__init__(self,*args) def process_request_thread(self, request, client_address): """ Overrides SocketServer.ThreadingMixIn.process_request_thread in order to catch socket.timeout """ try: self.finish_request(request, client_address) self.close_request(request) except socket.timeout: print 'Timeout during processing of request from', print client_address except socket.error, e: print e, 'during processing of request from', print client_address except: self.handle_error(request, client_address) self.close_request(request) class TimeoutHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): """ Abandon request handling when client has not responded for a certain time. This raises a socket.timeout exception. """ # Class-wide value for socket timeout timeout = 3 * 60 def setup(self): "Sets a timeout on the socket" self.request.settimeout(self.timeout) SimpleHTTPServer.SimpleHTTPRequestHandler.setup(self) def main(): try: BaseHTTPServer.test(TimeoutHTTPRequestHandler, ThreadedHTTPServer) except KeyboardInterrupt: print '^C received, shutting down server' if __name__ == '__main__': main()
Fetch album covers from Amazon
Edit: This does not seem to work anymore.
This is a stripped-down snippet from sonata to fetch music album covers from the amazon website.
#!/usr/bin/env python # Copyright 2006-2007 Scott Horowitz # Licensed under the GPL import urllib2, urllib def download_image_to_filename(artist, album, dest_filename): # Returns False if no images found imgfound = False img_url = "" # Amazon currently doesn't support utf8 and suggests latin1 encoding instead: try: artist = urllib.quote(artist.encode('latin1')) album = urllib.quote(album.encode('latin1')) except: artist = urllib.quote(artist) album = urllib.quote(album) amazon_key = "12DR2PGAQT303YTEWP02" search_url = "http://webservices.amazon.com/onca/xml?" \ + "Service=AWSECommerceService&AWSAccessKeyId=" \ + amazon_key \ + "&Operation=ItemSearch&SearchIndex=Music&Artist="\ + artist \ + "&ResponseGroup=Images&Keywords=" \ + album request = urllib2.Request(search_url) opener = urllib2.build_opener() f = opener.open(request).read() curr_pos = 300 # Skip header.. curr_pos = f.find("<LargeImage>", curr_pos+10) url_start = f.find("<URL>http://", curr_pos)+len("<URL>") url_end = f.find("</URL>", curr_pos) img_url = f[url_start:url_end] urllib.urlretrieve(img_url, dest_filename) imgfound = True return imgfound if __name__ == "__main__": import sys try: artist = sys.argv[1] album = sys.argv[2] outfile = sys.argv[3] except: print 'Usage: %s artist album outfile' % sys.argv[0] sys.exit(1) res = download_image_to_filename(artist, album, outfile) print res
Example use from the command-line:
./amazon.py "Boards of Canada" "The Campfire Headphase" boc.jpg