massmailer

massmailer sends a similar (multipart mime) e-mail to a bunch of people. The list of recipients is not disclosed: each e-mail is addressed individually. massmailer has been developped and successfully used for the organization of a scientific conference.

[download massmailer]

#!/usr/bin/env python
 
# Licensed under the Python License (see http://www.python.org/psf/license/)
# Copyright (C) 2008 Pierre Duquesne <stackp@online.fr>
 
USAGE='''\
usage:
massmailer --server <hostname> --addrlist <addr.txt> --from <addr>
           [--subject <subject>] [--log <logfile>]
           [--text <textfile>] [--img <image>] [--audio <sndfile>]
           [--bin <file>]
 
    massmailer sends a similar (multipart mime) mail to a bunch of people.
    --server <hostname>
          the name of the smtp server
    --addrlist <addr.txt>
          is a text file containing one address per line. Each
          address can be a raw email address, or in the format:
          Name of the person <person@site.org>
    --from <addr>
          is the sender address either formatted as jdoe@site.com or
          "John Doe <jdoe@site.com>"
    --subject <subject>
          the subject of the mail
    --log <logfile>
          a file name for the program to write success and failures
          in a human-readable text file.
    --text <txtfile>
    --img  <image>
    --audio <sndfile>
    --bin <file>
          attach a text file, a picture, an audio file, or any type of file to
          the mail. Files specified with --text don\'t have their filename
          specified in the mime headers (the text only is inserted).
          Use --bin if you want to attach the filename information.
 
Example:
    massmailer --server smtp.site.com --addrlist addr.txt --log log \\
               --from spammer@site.com --subject "Cheap Rolex" \\
               --text buy.txt --img rolex.png\
'''
 
import smtplib, sys, getopt
from email.MIMEBase import MIMEBase
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from email.MIMEImage import MIMEImage
from email.MIMEAudio import MIMEAudio
import email, mimetypes
from os.path import basename
 
# -- Parse command line arguments
 
opts, args = getopt.getopt(sys.argv[1:], ':h',\
                              ['server=','log=','addrlist=','from=','subject=',
                               'text=', 'img=', 'audio=', 'bin=', 'help'])
if args != []:
    print "Wrong command line arguments:", "'"+' '.join(args)+"'"
    print "Try '%s --help' for valid options." % sys.argv[0]
    sys.exit(1)
 
servername=None
logfile=None
addrfile=None
fromaddr=None
subject=''
attch=[]
 
for o,a in opts:
    if o=='--text' or o=='--img' or o=='--audio' or o=='--bin':
        attch.append((o[2:],a))
    elif o=='--server': servername=a
    elif o=='--log': logfile=a
    elif o=='--addrlist': addrfile=a
    elif o=='--from': fromaddr=a
    elif o=='--subject': subject=a
    elif o in ['-h', '--help']:
        print USAGE
        sys.exit(0)
 
# -- Print error messages
 
if not (servername and addrfile and fromaddr):
    if not servername:
        print 'Error: No server specified (--server option)'
    if not addrfile:
        print 'Error: No address list specified (--addrlist option)'
    if not fromaddr:
        print 'Error: No sender address specified (--from option)'
    sys.exit(1)
 
# -- Print warning messages
 
warning=''
if not subject:
    warning = warning + 'Warning: No subject specified (--subject option)\n'
if not logfile:
    warning = warning +'Warning: No log file specified (--logfile option)\n'
if attch == []: warning = warning + \
   'Warning: No content specified (--text, --img, --audio, --bin options)\n'
if warning != '':
    print '\n', warning
    inp = raw_input('Are you sure you want to continue [Y/n] ?')
    if inp not in ['Y','y', '']:
        print 'Aborted, bye!'
        sys.exit(0)
 
# -- Store the destination addresses in a list
 
addrlist = [l.strip() for l in open(addrfile).readlines() if l.strip() !='']
if logfile:
    print 'writing log to',logfile
    log = open(logfile, 'w')
else:
    log = sys.stdout
 
# -- Build multi-part mime message
 
msg = MIMEMultipart()
msg['Subject']=subject
msg['From']=fromaddr
msg['To']=''
 
for (ftype, f) in attch:
    fd = open(f)
    content = fd.read()
    if ftype == 'text': msg.attach(MIMEText(content))
    elif ftype == 'img':
        m = MIMEImage(content)
        m.add_header('Content-Disposition','inline', filename=basename(f))
        msg.attach(m)
    # FIXME: mp3 subtype not found
    elif ftype == 'audio': msg.attach(MIMEAudio(content))
    elif ftype == 'bin':
        m = MIMEBase('application', 'octet-stream')
        m.add_header('Content-Disposition','attachment', filename=basename(f))
        m.set_payload(content)
        email.Encoders.encode_base64(m)
        msg.attach(m)
 
    fd.close()
 
# -- Send the message
 
try:
    server = smtplib.SMTP(servername)
except Exception, e:
    print >>log, 'Could not connect to', servername, e.__class__, e
    sys.exit(2)
 
total = len(addrlist)
for (idx, addr) in enumerate(addrlist):
    try:
        msg.replace_header('To', addr)
        server.sendmail(fromaddr, addr, msg.as_string())
        print >>log, 'Success: ', addr
        print 'Progress: %d/%d' % (idx+1, total)
    except Exception, e:
        print >>log, 'Error:',addr, e.__class__, e
 
server.quit()

imapsave

imapsave simply copies the content of an imap account into mbox text files (one file per folder). It does not do incremental backups.

I wrote imapsave when I realized imapbackup could not use SSL encryption.

[download imapsave]

#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
 
# Licensed under the Python License (see http://www.python.org/psf/license/)
# Copyright (C) 2008 Pierre Duquesne <stackp@online.fr>
#
# Changelog:
#   20080722 * disable spinning when output is redirected to a file
#   20080718 * Use BODY.PEEK[] in the fetch command as specified in RFC 3501
#              (fix by Stefan)
#   20080313 * Initial release
 
import getpass
import imaplib
import email
import email.Parser
import re
import sys
import getopt
from os.path import basename
 
USAGE = """\
usage: %s [--ssl] [--port num] [--password pass] [server] [user]
 
    server
        The hostname of your imap server
 
    user
        Your login
 
    --ssl
        Use an encrypted connection
 
    -p num
    --port num
        Use port number num instead of default port
        (Defaults are 143 for unencrypted connection and 993 for ssl)
 
    -P pass
    --password pass
        Password for the server
 
""" % basename(sys.argv[0])
 
class Spinner:
    """A class to show some activity on a character terminal."""
 
    def __init__(self, message=''):
        self.message = message
        self.symbols = list('-\|/')
        self.nsym = len(self.symbols)
        self.n = 0
 
    def spin(self):
        print '\r', self.message + self.symbols[self.n],
        sys.stdout.flush()
        self.n = (self.n + 1) % self.nsym
 
    def stop(self, stop_message=''):
        print '\r', self.message + stop_message
 
    def set_message(self, message):
        self.message = message
 
class DummySpinner:
    """A class that mock Spinner and do not spin.
 
    Used when output is not a character terminal (e.g. a log file).
 
    """   
    def __init__(self, message=''):
        self.message = message
 
    def spin(self):
        pass
 
    def stop(self, stop_message=''):
        print self.message + stop_message
 
    def set_message(self, message):
        self.message = message
 
 
def foldname_to_filename(folder):
    """Deal with weird characters in folder names."""
    filename = re.sub('"(.*)"', ur'\1', folder)
    filename = filename.replace('/', '.')
    return filename
 
try:
    # Parse command line arguments
    try:
        opts, args = getopt.gnu_getopt(sys.argv[1:], 'pP:h',
                                       ['port=', 'password=', 'ssl', 'help'])
    except getopt.GetoptError, e:
        print 'Error:', e
        sys.exit(1)
 
    ssl = False
    port = None
    password = None
    for o, a in opts:
        if o == '--ssl':
            ssl = True
        if o == '-p' or o == '--port':
            port = int(a)
        if o == '-P' or o == '--password':
            password = a
        elif o in ['-h', '--help']:
            print USAGE
            sys.exit(0)
 
    try:
        servername = args[0]
    except IndexError:
        servername = raw_input("Enter the server hostname: ")
    try:
        username = args[1]
    except IndexError:
        username = raw_input("Enter your username: ")
 
    # Connect
    print 'Connecting to %s as user %s ...' % (servername, username)
    if ssl:
        IMAP = imaplib.IMAP4_SSL
    else:
        IMAP = imaplib.IMAP4
    try:
        if port:
            server = IMAP(servername, port)
        else:
            server = IMAP(servername)
        if not password:
            password = getpass.getpass()
        server.login(username, password)
    except Exception, e:
        print 'Error:', e
        sys.exit(1)
 
    # Retrieve folder list
    folders = []
    foldptn = re.compile('\([^\)]*\) "[^"]*" ([^"]*)')
    for fold_desc in server.list()[1]:
        folder = foldptn.sub(ur'\1', fold_desc)
        folders.append(folder)
 
    # Save messages in a separate file for each folder
    if sys.stdout.isatty():
        spinner = Spinner()
    else:
        spinner = DummySpinner()
    parser = email.Parser.Parser()
    succeed = []
    failed = []
    for folder in folders:
        msg = '%s ... ' % folder
        spinner.set_message(msg)
        try:
            resp, info = server.select(folder)
            if resp != 'OK':
                raise imaplib.IMAP4.error(' - '.join(info))
            filename = foldname_to_filename(folder)
            fp = open(filename, 'w')
            resp, items = server.search(None, "ALL")
            numbers = items[0].split()
            for num in numbers:
                resp, data = server.fetch(num, "(BODY.PEEK[])")
                text = data[0][1]
                mess = parser.parsestr(text)
                fp.write(mess.as_string(unixfrom=True))
                spinner.spin()
            succeed.append(folder)
            spinner.stop('Done.')
            fp.close()
        except imaplib.IMAP4.error, e:
            failed.append(folder)
            spinner.stop('Error! (' + str(e) + ')')
    server.logout()
 
    # Print the folders which failed, if any
    if failed != []:
        import textwrap
        wrapper = textwrap.TextWrapper(initial_indent='    ',
                                       subsequent_indent='    ')
        print
        print 'WARNING - The following folders could not be saved:'
        print wrapper.fill(', '.join(failed))
        sys.exit(1)
 
except KeyboardInterrupt:
    print ''
    print '^C received, stopping.'

Spinner: display activity

This was inspired by some code found in imapbackup. Sometimes you need to display activity on the text console to inform the user that the program is actually doing something.

That is the purpose of this small python class. The test() function shows the usage. Here’s what test() displays:
Spinner screencast

[download spinner.py]

# Licensed under the Python License (see http://www.python.org/psf/license/)
# Copyright (C) 2008 Pierre Duquesne <stackp@online.fr>
 
import sys
 
class Spinner:
    """A class to show a spinning ascii animation on a character terminal.
 
    It informs the user that some processing is being done.
    """
 
    def __init__(self, message=''):
        self.message = message
        self.symbols = list('-\|/')
        self.nsym = len(self.symbols)
        self.n = 0
 
    def spin(self):
        print '\r', self.message + self.symbols[self.n],
        sys.stdout.flush()
        self.n = (self.n + 1) % self.nsym
 
    def stop(self, stop_message=''):
        print '\r', self.message + stop_message
 
    def set_message(self, message):
        self.message = message
 
 
class DummySpinner:
    """A class that mock Spinner and do not spin.
 
    To be used when output is not a character terminal (e.g. a log file). 
    For example::
 
        if sys.stdout.isatty():
            s = Spinner()
        else:
            s = DummySpinner()
 
    """   
    def __init__(self, message=''):
        self.message = message
 
    def spin(self):
        pass
 
    def stop(self, stop_message=''):
        print self.message + stop_message
 
    def set_message(self, message):
        self.message = message
 
 
if __name__ == "__main__":
 
    import time
 
    def test():
        if sys.stdout.isatty():
            s = Spinner()
        else:
            s = DummySpinner()
 
        # simple usage
        for i in range(10):
            s.spin()
            time.sleep(0.1)
 
        s.set_message('Initialization ... ')
        for i in range(10):
            s.spin()
            time.sleep(0.1)
        s.stop('Done')
 
        s.set_message('Loading ... ')
        for i in range(10):
            s.spin()
            time.sleep(0.1)
        s.stop('Done')
 
    test()

A better minimal web server in python

The simple web server from the python standard library is easy to improve to:

  • answer several requests at the same time, and
  • cancel a connection when the client stops responding.

[download webserver.py]

#!/usr/bin/env python
 
import SimpleHTTPServer, BaseHTTPServer, SocketServer, socket
 
class ThreadedHTTPServer(SocketServer.ThreadingMixIn,
                         BaseHTTPServer.HTTPServer) :
    """
    New features w/r to BaseHTTPServer.HTTPServer:
    - serves multiple requests simultaneously
    - catches socket.timeout and socket.error exceptions (raised from
      RequestHandler)
    """
 
    def __init__(self, *args):
        BaseHTTPServer.HTTPServer.__init__(self,*args)
 
    def process_request_thread(self, request, client_address):
        """
        Overrides SocketServer.ThreadingMixIn.process_request_thread
        in order to catch socket.timeout
        """
        try:
            self.finish_request(request, client_address)
            self.close_request(request)
        except socket.timeout:
            print 'Timeout during processing of request from',
            print client_address
        except socket.error, e:
            print e, 'during processing of request from',
            print client_address
        except:
            self.handle_error(request, client_address)
            self.close_request(request)
 
 
class TimeoutHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
    """
    Abandon request handling when client has not responded for a
    certain time. This raises a socket.timeout exception.
    """
 
    # Class-wide value for socket timeout
    timeout = 3 * 60 
 
    def setup(self):
        "Sets a timeout on the socket"
        self.request.settimeout(self.timeout)
        SimpleHTTPServer.SimpleHTTPRequestHandler.setup(self)
 
 
def main():
    try:
        BaseHTTPServer.test(TimeoutHTTPRequestHandler, ThreadedHTTPServer)
    except KeyboardInterrupt:
        print '^C received, shutting down server'
 
if __name__ == '__main__':
    main()

Fetch album covers from Amazon

Edit: This does not seem to work anymore.

This is a stripped-down snippet from sonata to fetch music album covers from the amazon website.

[download amazon.py]

#!/usr/bin/env python
 
# Copyright 2006-2007 Scott Horowitz
# Licensed under the GPL
 
import urllib2, urllib
 
def download_image_to_filename(artist, album, dest_filename): 
    # Returns False if no images found
    imgfound = False
    img_url = ""
    # Amazon currently doesn't support utf8 and suggests latin1 encoding instead:
    try:
        artist = urllib.quote(artist.encode('latin1'))
        album = urllib.quote(album.encode('latin1'))
    except:
        artist = urllib.quote(artist)
        album = urllib.quote(album)
    amazon_key = "12DR2PGAQT303YTEWP02"
    search_url = "http://webservices.amazon.com/onca/xml?" \
                 + "Service=AWSECommerceService&AWSAccessKeyId=" \
                 + amazon_key \
                 + "&Operation=ItemSearch&SearchIndex=Music&Artist="\
                 + artist \
                 + "&ResponseGroup=Images&Keywords=" \
                 + album
    request = urllib2.Request(search_url)
    opener = urllib2.build_opener()
    f = opener.open(request).read()
    curr_pos = 300    # Skip header..
    curr_pos = f.find("<LargeImage>", curr_pos+10)
    url_start = f.find("<URL>http://", curr_pos)+len("<URL>")
    url_end = f.find("</URL>", curr_pos)
    img_url = f[url_start:url_end]
    urllib.urlretrieve(img_url, dest_filename)
    imgfound = True
    return imgfound
 
if __name__ == "__main__":
    import sys
    try:
        artist = sys.argv[1]
        album = sys.argv[2]
        outfile = sys.argv[3]
    except:
        print 'Usage: %s artist album outfile' % sys.argv[0]
        sys.exit(1)
    res = download_image_to_filename(artist, album, outfile)
    print res

Example use from the command-line:

./amazon.py "Boards of Canada" "The Campfire Headphase" boc.jpg