Add Messages from mbox file

This is a Python script which calls zmmailbox. It is based on Mbox to maildir with Python. It has not been tested extensively, so use at your own risk.

#!/usr/bin/env python

# Upload mbox format email to zimbra

########################################################################
# Libraries
########################################################################

import email, email.Errors, mailbox, imaplib, sys, getopt
import os.path, StringIO, re, tempfile

########################################################################
# Configuration defaults
########################################################################

# Store configuration in a dictionary so that we need only one argument
# to "global" in funtions to reference all the values.
#
config = {}

# Set defaults
#
config['self'] = os.path.basename(sys.argv[0])
config['verbose'] = 5 # "notice"; Syslog-style priority level
config['mailbox'] = 'Inbox'
config['recursive_mode'] = 0

########################################################################
# Functions
########################################################################

def main():
    output('info', 'main(): starting')

    global config

    process_options()

    if config['recursive_mode'] == 1:
        output('debug', 'main(): about to run recursive_import()')
        recursive_import()
    else:
        output('debug', 'main(): about to run single_import()')
        single_import()

    output('info', 'main(): completed')

########################################################################

# TODO test mbox files with . in the name
# TODO test both absolute and relative paths in import
# Given an mbox file path, translate it to an IMAP-style mailbox path
def source2target(source):
    output('info', 'source2target(%s): starting' % source)

    target = source

    if re.search(r'\.', source) != None:
        output('warning', 'mbox file "%s" contains "." character' % source)
        output('warning', 'Replacing "." with "_"')
        target = re.sub(r'\.', '_', target)

    # Translate to IMAP-style path separator (replace "/" with ".")
    target = re.sub(r'/', '.', target)

    # Strip off the containing directory (up to the first "." character)
    target = re.sub(r'^[^\.]+.', '', target)

    output('info', 'source2target(%s): returning %s' % (source, target) )

    return target

########################################################################

# Upload all files in a hierarchy to an IMAP server
def recursive_import():
    output('info', 'recursive_import(): starting')

    global config

    if not os.path.isdir(config['source']):
        output('crit', 'Argument must be a directory when using -r')
        output('crit', 'Given "%s")' % config['source'])
        sys.exit(1)

    source_list = build_file_list(config['source'])
    output('debug', 'recursive_import(): source_list = %s' % source_list)

    # Need to know what to strip off when creating targets on the
    # IMAP server
    #
    base = os.path.dirname(config['source'])
    base = base + '/'

    target_list = source_list

    # If the source directory was specified as a path with more that
    # one component, we need to strip it down to the last componend(the
    # containing directory) since that is what the hierarchy on the IMAP
    # server will be created relative to.
    #
    target_list = map(lambda x: re.sub('^' + base, '', x), target_list)

    target_list = map(source2target, target_list)
    output('debug', 'recursive_import(): target_list = %s' % target_list)

    create_mailboxes(target_list)

    for source in source_list:
        output('debug', 'recursive_import(): source = %s' % source)

        target = source
        output('debug', 'recursive_import(): target = %s' % target)

        output('debug', 'recursive_import(): base = %s' % base)
        target = re.sub('^' + base, '', target)
        output('debug', 'recursive_import(): target = %s' % target)

        target = source2target(target)
        output('debug', 'recursive_import(): target = %s' % target)

        output('debug', 'recursive_import(): source = %s' % source)
        output('debug', 'recursive_import(): target = %s' % target)

        output('notice', 'Starting import of %s to %s' % (source, target) )
        do_import(source, target)
        output('notice', 'Finished import of %s to %s' % (source, target) )

    output('info', 'recursive_import(): completed')

########################################################################

def single_import():
    output('info', 'single_import(): starting')
    global config

    source = config['source']
    target = config['mailbox']

    output('notice', 'Starting import of %s to %s' % (source, target) )
    do_import(source, target)
    output('notice', 'Finished import of %s to %s' % (source, target) )

    output('info', 'single_import(): completed')

########################################################################

# Process command line options
def process_options():
    output('info', 'process_options(): starting')

    global config

    try:
        opts, args = getopt.getopt(sys.argv[1:], "i:rs:u:v:p:")

    except getopt.GetoptError:
        usage()
        sys.exit(1)
    
    for option, argument in opts:
        if option == '-m':
            config['mailbox'] = argument
        if option == '-r':
            config['recursive_mode'] = 1
        if option == "-u":
            config['user'] = argument
        if option == "-v":
            config['verbose'] = argument

    # Make sure desired log level is stored as an integer
    config['verbose'] = numeric_log_level(config['verbose'])

    output('debug', 'process_options(): opts = %s' % opts)
    output('debug', 'process_options(): args = %s' % args)

    # Summarize config
    output('debug', "process_options(): config['mailbox'] = %s" %
        config['mailbox'] )
    output('debug', "process_options(): config['user'] = %s" %
        config['user'] )
    output('debug', "process_options(): config['verbose'] = %s" %
        config['verbose'] )
    output('debug', "process_options(): config['recursive_mode'] = %s" %
        config['recursive_mode'] )

    if len(args) == 0:
        usage()
        sys.exit()

    if len(args) != 1:
        output('crit', 'Too many file arguments: %s' % ' '.join(args))
        output('crit', 'Expecting only one; aborting')
        sys.exit(1)

    config['source'] = args[0]

    output('info', 'process_options(): completed')

########################################################################

# Return true if file is in mbox format
def is_mbox_file(file):
    output('info', 'is_mbox_file(%s): starting' % file)

    return open(file).readline()[0:5] == 'From '

########################################################################

# Given a directory, return a list of contained mbox files
def build_file_list(node):
    output('info', 'build_file_list(%s): starting' % node)

    file_children = []
    directory_children = []

    for entry in os.listdir(node):
        if os.path.isfile(node + '/' + entry):
            if is_mbox_file(node + '/' + entry):
                file_children.append(entry)
        elif os.path.isdir(node + '/' + entry):
            directory_children.append(entry)

    # Add containing directory to each entry
    flat = map(lambda x: node + '/' + x, file_children)

    # Recursively process directory children
    for entry in directory_children:
        flat.extend(build_file_list(node + '/' + entry))

    return flat

########################################################################

def create_mailboxes(mailboxes):
    global config
    output('info', 'create_mailboxes(%s): starting' % mailboxes)

    # Attempting to create a mailbox that already exists produces an
    # IMAP protocol error, so we only want to attempt to create a
    # mailbox that does not exist. To do this, we need a list of the
    # current mailboxes. We can get that with the list() method of the
    # IMAP4_SSL object, but the output it returns is formatted in a
    # strange way:
    #
    #    (\Noinferiors) "." "INBOX"
    #
    # We need to extract the string in the INBOX location. Use map() to
    # iterate over the list and pull out the folder name using a regular
    # expression.
    #
    extract = lambda x: re.search(r'^.*"\." "(.*)"', x).group(1)
    current_mailboxes = ('') # TODO 'zmmailbox -z -m %s gaf' % config['user']
    current_mailboxes = map(extract, current_mailboxes)

    for mailbox in mailboxes:
        if not current_mailboxes.__contains__(mailbox):
            output('notice', 'Creating mailbox: ' + mailbox)
            #TODO 'zmmailbox -z -m %s cf %s' % (config['user'], mailbox)

########################################################################

# Take an integer or string log level and return an integer log level
#
def numeric_log_level(level):
    # If level is an integer between 0 and 7, pass it back
    if range(8).__contains__(level):
        return(level)
    if level == 'debug':
        return(7)
    if level == 'info':
        return(6)
    if level == 'notice':
        return(5)
    if level == 'warning':
        return(4)
    if level == 'err':
        return(3)
    if level == 'crit':
        return(2)
    if level == 'alert':
        return(1)
    if level == 'emerg':
        return(0)
    # crit, alert, emerg: critical error, immediate termination
    # err: non-fatal problem
    # warning: possibly negative informational message
    # notice: neutral informational... TODO
    # info: function calls, arguments
    # debug: protocol, data details

    output('warning', 'Unknown log level "%s", assuming "emerg"' % level)
    return(0)

########################################################################

# Take an integer or string log level and return a string log level
#
def string_log_level(level):
    string_levels = ['emerg', 'alert', 'crit', 'err', 'warning',
                     'notice', 'info', 'debug']

    # If level is already a valid string, pass it back
    if string_levels.__contains__(level):
        return(level)

    # If level is a string between 0 and 7, return appropriate string
    if range(8).__contains__(level):
        return(string_levels[level])

    output('warning', 'Unknown log level "%s", assuming "emerg"' % level)
    return('emerg')
    
########################################################################

def output(level, message):
    global config

    if numeric_log_level(level) <= config['verbose']:
        print "%s: (%s) %s" % (config['self'],
                               string_log_level(level),
                               message)

########################################################################

# TODO
def usage():
    global config

    print '''Usage: %s [OPTION]... FILE
Import contents of mbox FILE to zimbra.

  -m MAILBOX     when not using -r, import to MAILBOX (default: %s)
  -r             recursively import mbox files (FILE must be a directory)
  -u USER        authenticate as USER
  -v LEVEL       set verbosity to LEVEL (syslog priority style)

Note: "." characters are not allowed in mailbox names or directory
names. Such characters will be converted to "_" on the server.

When using -r, mailbox names will be derived from mbox file
hierarchy structure.

Warning: Please do not delete source mail until you have verified that
it has been imported successfully. This tool has been written with
safety in mind, but there are no guarantees.
''' % (config['self'], config['mailbox'])

########################################################################

def msgfactory(fp):
    try:
        return email.message_from_file(fp)
    except email.Errors.MessageParseError:
        # Don't return None since that will stop the mailbox iterator
        return ''

########################################################################

# Extract the subject from a string representing an email message
def get_subject(msg_txt):
    output('info', 'get_subject(): starting')

    buffer = StringIO.StringIO(msg_txt)

    for line in buffer:
        if re.search(r'^Subject:', line):
            return line.rstrip()
        if line == '\n':
            # End of headers. If we reached here, there is no subject.
            output('warning', 'Message does not have a subject')
            return ''

########################################################################

def do_import(from_file, to_mailbox):
    global config
    output('info', 'do_import(%s, %s): starting' % (from_file, to_mailbox) )

    fp = open(from_file, 'r')
    mbox = mailbox.UnixMailbox(fp, msgfactory)
    
    for msg_obj in mbox:
        msg_txt = msg_obj.as_string(unixfrom=False)

        msg_fp, msg_fname = tempfile.mkstemp()
        os.write(msg_fp, msg_txt)
        os.close(msg_fp)

        subject = get_subject(msg_txt)
        output('notice', 'Uploading message from %s: %s' % (msg_fname, subject))

        os.system('zmmailbox -z -m %s am %s %s' % (config['user'], to_mailbox, msg_fname))

        #os.delete(msg_fname)

########################################################################

main()

Verified Against: unknown Date Created: 5/17/2009
Article ID: https://wiki.zimbra.com/index.php?title=Add_Messages_from_mbox_file Date Modified: 2015-03-24



Try Zimbra

Try Zimbra Collaboration with a 60-day free trial.
Get it now »

Want to get involved?

You can contribute in the Community, Wiki, Code, or development of Zimlets.
Find out more. »

Looking for a Video?

Visit our YouTube channel to get the latest webinars, technology news, product overviews, and so much more.
Go to the YouTube channel »

Jump to: navigation, search