Mbox to maildir with Python

Revision as of 19:51, 25 March 2015 by Jorge de la Cruz (talk | contribs)

(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

A python solution that I found to be easier to install (required no extra installation on Centos 4.4) was the mbox2imap script found at: http://people.cs.uchicago.edu/~brendan/scripts/mbox2imap . Here is a modified version that adds the ability to pass the imap password on the command line:

#!/usr/bin/env python

# Upload mbox format email to an IMAP server

########################################################################
# Libraries
########################################################################

import email, email.Errors, mailbox, imaplib, getpass, sys, getopt
import os.path, StringIO, re

########################################################################
# Configuration defaults
########################################################################

# Store configuration in a dictionary
#
config = {}

# Set defaults
#
config['self'] = os.path.basename(sys.argv[0])
config['verbose'] = 5 # "notice"; Syslog-style priority level
config['user'] = getpass.getuser()
config['imapmailbox'] = 'INBOX'
config['imapserver'] = 'laime.cs.uchicago.edu'
config['recursive_mode'] = False
config['passwd'] = ''

########################################################################
# Functions
########################################################################

def main():
    output('info', 'main(): starting')

    process_options()

    if config['recursive_mode']:
        output('debug', 'main(): about to run recursive_upload()')
        recursive_upload()
    else:
        output('debug', 'main(): about to run single_upload()')
        single_upload()

    output('info', 'main(): completed')

########################################################################

# Log in to the IMAP server and return an object representing the
# authenticated session
#
def login():
    output('info', 'login(): starting')

    print "%s: Authenticating to IMAP server" % config['self']
    server = imaplib.IMAP4_SSL(config['imapserver'])
    if len(config['passwd']) == 0:
        check_response(server.login(config['user'], getpass.getpass()))
    else:
        check_response(server.login(config['user'], config['passwd']))
    return server

########################################################################

# TODO test mbox files with . in the name
# TODO test both absolute and relative paths in upload
# Given an mbox file path, translate it to an IMAP-style mailbox path
def source2target(source):
    output('info', 'source2target(%s): starting' % source)

    target = source

    if '.' in source:
        output('warning', 'mbox file "%s" contains "." character' % source)
        output('warning', 'Replacing "." with "_"')
        target = target.replace('.', '_')

    # Translate to IMAP-style path separator (replace "/" with ".")
    target = target.replace('/', '.')

    # Strip off the containing directory (up to the first "." character)
    target = re.sub(r'^[^\.]+\.', '', target)

    output('info', 'source2target(%s): returning %s' % (source, target) )

    return target

########################################################################

# Upload all files in a hierarchy to an IMAP server
def recursive_upload():
    output('info', 'recursive_upload(): starting')

    if not os.path.isdir(config['source']):
        output('crit', 'Argument must be a directory when using -r')
        output('crit', 'Given "%s")' % config['source'])
        sys.exit(1)

    source_list = build_file_list(config['source'])
    output('debug', 'recursive_upload(): source_list = %s' % source_list)

    # Need to know what to strip off when creating targets on the
    # IMAP server
    #
    base = os.path.dirname(config['source'])
    base += '/'

    target_list = source_list

    # If the source directory was specified as a path with more that
    # one component, we need to strip it down to the last component (the
    # containing directory) since that is what the hierarchy on the IMAP
    # server will be created relative to.
    #
    target_list = map(lambda x: re.sub('^' + base, '', x), target_list)

    target_list = map(source2target, target_list)
    output('debug', 'recursive_upload(): target_list = %s' % target_list)

    # Want to be able to explain what is going to happen to the user
    # before they have to type in their password. That is why login()
    # is called here.
    #
    server = login()

    create_imap_mailboxes(target_list, server)

    for source in source_list:
        output('debug', 'recursive_upload(): source = %s' % source)

        target = source
        output('debug', 'recursive_upload(): target = %s' % target)

        output('debug', 'recursive_upload(): base = %s' % base)
        target = re.sub('^' + base, '', target)
        output('debug', 'recursive_upload(): target = %s' % target)

        target = source2target(target)
        output('debug', 'recursive_upload(): target = %s' % target)

        output('debug', 'recursive_upload(): source = %s' % source)
        output('debug', 'recursive_upload(): target = %s' % target)

        output('notice', 'Starting upload of %s to %s' % (source, target) )
        upload(source, target, server)
        output('notice', 'Finished upload of %s to %s' % (source, target) )

    output('info', 'recursive_upload(): completed')

########################################################################

def single_upload():
    output('info', 'single_upload(): starting')

    source = config['source']
    target = config['imapmailbox']

    # Want to be able to explain what is going to happen to the user
    # before they have to type in their password. That is why login()
    # is called here.
    #
    server = login()

    output('notice', 'Starting upload of %s to %s' % (source, target) )
    upload(source, target, server)
    output('notice', 'Finished upload of %s to %s' % (source, target) )

    output('info', 'single_upload(): completed')

########################################################################

# Process command line options
def process_options():
    output('info', 'process_options(): starting')

    try:
        opts, args = getopt.getopt(sys.argv[1:], "i:rs:u:v:p:")

    except getopt.GetoptError:
        usage()
        sys.exit(1)
    
    for option, argument in opts:
        if option == '-i':
            config['imapmailbox'] = argument
        elif option == '-r':
            config['recursive_mode'] = True
        elif option == '-s':
            config['imapserver'] = argument
        elif option == '-u':
            config['user'] = argument
        elif option == '-v':
            config['verbose'] = argument
        elif option == '-p':
            config['passwd'] = argument

    # Make sure desired log level is stored as an integer
    config['verbose'] = numeric_log_level(config['verbose'])

    output('debug', 'process_options(): opts = %s' % opts)
    output('debug', 'process_options(): args = %s' % args)

    # Summarize config
    output('debug', "process_options(): config['imapmailbox'] = %s" %
        config['imapmailbox'] )
    output('debug', "process_options(): config['imapserver'] = %s" %
        config['imapserver'] )
    output('debug', "process_options(): config['user'] = %s" %
        config['user'] )
    output('debug', "process_options(): config['verbose'] = %s" %
        config['verbose'] )
    output('debug', "process_options(): config['recursive_mode'] = %s" %
        config['recursive_mode'] )

    if len(args) == 0:
        usage()
        sys.exit()

    if len(args) != 1:
        output('crit', 'Too many file arguments: %s' % ' '.join(args))
        output('crit', 'Expecting only one; aborting')
        sys.exit(1)

    config['source'] = args[0]

    output('info', 'process_options(): completed')

########################################################################

# Return true if file is in mbox format
def is_mbox_file(file):
    output('info', 'is_mbox_file(%s): starting' % file)

    return open(file).readline().startswith('From ')

########################################################################

# Given a directory, return a list of contained mbox files
def build_file_list(node):
    output('info', 'build_file_list(%s): starting' % node)

    file_children = []
    directory_children = []

    for entry in os.listdir(node):
        if os.path.isfile(node + '/' + entry):
            if is_mbox_file(node + '/' + entry):
                file_children.append(entry)
        elif os.path.isdir(node + '/' + entry):
            directory_children.append(entry)

    # Add containing directory to each entry
    flat = [node + '/' + x for x in file_children]

    # Recursively process directory children
    for entry in directory_children:
        flat.extend(build_file_list(node + '/' + entry))

    return flat

########################################################################

def create_imap_mailboxes(imap_mailboxes, server):
    output('info', 'create_imap_mailboxes(%s): starting' % imap_mailboxes)

    # Attempting to create a mailbox that already exists produces an
    # IMAP protocol error, so we only want to attempt to create a
    # mailbox that does not exist. To do this, we need a list of the
    # current mailboxes. We can get that with the list() method of the
    # IMAP4_SSL object, but the output it returns is formatted in a
    # strange way:
    #
    #    (\Noinferiors) "." "INBOX"
    #
    # We need to extract the string in the INBOX location. Use map() to
    # iterate over the list and pull out the folder name using a regular
    # expression.
    #
    current_mailboxes = server.list()[1]
    current_mailboxes = re.findall(r'^.*"\." "(.*)"', current_mailboxes)

    for mailbox in imap_mailboxes:
        if mailbox not in current_mailboxes:
            output('notice', 'Creating mailbox: ' + mailbox)
            check_response(server.create(mailbox))

########################################################################

# Take an integer or string log level and return an integer log level
#
def numeric_log_level(level):
    # If level is an integer between 0 and 7, pass it back
    if level in range(8):
        return level
    elif level == 'debug':
        return 7
    elif level == 'info':
        return 6
    elif level == 'notice':
        return 5
    elif level == 'warning':
        return 4
    elif level == 'err':
        return 3
    elif level == 'crit':
        return 2
    elif level == 'alert':
        return 1
    elif level == 'emerg':
        return 0
    # crit, alert, emerg: critical error, immediate termination
    # err: non-fatal problem
    # warning: possibly negative informational message
    # notice: neutral informational... TODO
    # info: function calls, arguments
    # debug: protocol, data details

    output('warning', 'Unknown log level "%s", assuming "emerg"' % level)
    return 0

########################################################################

# Take an integer or string log level and return a string log level
#
def string_log_level(level):
    string_levels = ['emerg', 'alert', 'crit', 'err', 'warning',
                     'notice', 'info', 'debug']

    # If level is already a valid string, pass it back
    if level in string_levels:
        return level

    # If level is a string between 0 and 7, return appropriate string
    if level in range(8):
        return string_levels[level]

    output('warning', 'Unknown log level "%s", assuming "emerg"' % level)
    return 'emerg'
    
########################################################################

def output(level, message):
    if numeric_log_level(level) <= config['verbose']:
        print "%s: (%s) %s" % (config['self'],
                               string_log_level(level),
                               message)

########################################################################

# TODO
def usage():
    print '''Usage: %s [OPTION]... FILE
Upload contents of mbox FILE to an SSL IMAP server.

  -i MAILBOX     when not using -r, upload to MAILBOX (default: %s)
  -r             recursively upload mbox files (FILE must be a directory)
  -s SERVER      connect to SERVER (default: %s)
  -u USER        authenticate as USER
  -v LEVEL       set verbosity to LEVEL (syslog priority style)
  -p PASSWORD    password for USER

Note: "." characters are not allowed in IMAP mailbox names or directory
names. Such characters will be converted to "_" on the server.

When using -r, IMAP mailbox names will be derived from mbox file
hierarchy structure.

Warning: Please do not delete source mail until you have verified that
it has been uploaded successfully. This tool has been written with
safety in mind, but there are no guarantees.
''' % (config['self'], config['imapmailbox'], config['imapserver'])

########################################################################

def msgfactory(fp):
    try:
        return email.message_from_file(fp)
    except email.Errors.MessageParseError:
        # Don't return None since that will stop the mailbox iterator
        return ''

########################################################################

def check_response(response):
    output('info', 'check_response(): starting')

    r = response[0]
    data = response[1]

    output('debug', 'IMAP protocol response: %s' % r)
    output('debug', 'IMAP protocol data: %s' % data)

    if r != 'OK':
        output(1, "IMAP protocol error")
        output(1, "Protocol response: " + str(r))
        output(1, "Diagnostic message: " + str(data))

########################################################################

# Extract the subject from a string representing an email message
def get_subject(msg_txt):
    output('info', 'get_subject(): starting')

    buffer = StringIO.StringIO(msg_txt)

    for line in buffer:
        if line.startswith('Subject:'):
            return line.rstrip()
        if line == '\n':
            # End of headers. If we reached here, there is no subject.
            output('warning', 'Message does not have a subject')
            return ''

########################################################################
########################################################################

def check_response(response):
    output('info', 'check_response(): starting')

    r = response[0]
    data = response[1]

    output('debug', 'IMAP protocol response: %s' % r)
    output('debug', 'IMAP protocol data: %s' % data)

    if r != 'OK':
        output(1, "IMAP protocol error")
        output(1, "Protocol response: " + str(r))
        output(1, "Diagnostic message: " + str(data))

########################################################################

# Extract the subject from a string representing an email message
def get_subject(msg_txt):
    output('info', 'get_subject(): starting')

    buffer = StringIO.StringIO(msg_txt)

    for line in buffer:
        if line.startswith('Subject:'):
            return line.rstrip()
        if line == '\n':
            # End of headers. If we reached here, there is no subject.
            output('warning', 'Message does not have a subject')
            return ''

########################################################################

def upload(from_file, to_mailbox, server):
    output('info', 'upload(%s, %s): starting' % (from_file, to_mailbox) )

    fp = open(from_file, 'r')
    mbox = mailbox.UnixMailbox(fp, msgfactory)
    
    for msg_obj in mbox:
        msg_txt = msg_obj.as_string(unixfrom=False)

        subject = get_subject(msg_txt)
        output('notice', 'Uploading message: %s' % subject)

        # Regarding third argument to append,
        # see RFC 3501 sections 2.3.3, 6.3.11
        check_response(server.append(to_mailbox, "", "", msg_txt))

########################################################################

main()

Someone wrote a script that iterated over the mbox names, got the password from a csv file and uploaded the emails automatically using this tool. (location?)

Jump to: navigation, search