Add Messages from mbox file: Difference between revisions
(New page: This is a Python script which calls zmmailbox. It is based on Mbox to maildir with Python. It has not been tested extensively, so use at your own risk. <pre> #!/usr/bin/env python # ...) |
(added article footer and category) |
||
Line 403: | Line 403: | ||
</pre> | </pre> | ||
{{Article Footer|unknown |5/17/2009}} | |||
[[Category:Mailbox]] |
Revision as of 22:22, 18 May 2009
This is a Python script which calls zmmailbox. It is based on Mbox to maildir with Python. It has not been tested extensively, so use at your own risk.
#!/usr/bin/env python # Upload mbox format email to zimbra ######################################################################## # Libraries ######################################################################## import email, email.Errors, mailbox, imaplib, sys, getopt import os.path, StringIO, re, tempfile ######################################################################## # Configuration defaults ######################################################################## # Store configuration in a dictionary so that we need only one argument # to "global" in funtions to reference all the values. # config = {} # Set defaults # config['self'] = os.path.basename(sys.argv[0]) config['verbose'] = 5 # "notice"; Syslog-style priority level config['mailbox'] = 'Inbox' config['recursive_mode'] = 0 ######################################################################## # Functions ######################################################################## def main(): output('info', 'main(): starting') global config process_options() if config['recursive_mode'] == 1: output('debug', 'main(): about to run recursive_import()') recursive_import() else: output('debug', 'main(): about to run single_import()') single_import() output('info', 'main(): completed') ######################################################################## # TODO test mbox files with . in the name # TODO test both absolute and relative paths in import # Given an mbox file path, translate it to an IMAP-style mailbox path def source2target(source): output('info', 'source2target(%s): starting' % source) target = source if re.search(r'\.', source) != None: output('warning', 'mbox file "%s" contains "." character' % source) output('warning', 'Replacing "." with "_"') target = re.sub(r'\.', '_', target) # Translate to IMAP-style path separator (replace "/" with ".") target = re.sub(r'/', '.', target) # Strip off the containing directory (up to the first "." character) target = re.sub(r'^[^\.]+.', '', target) output('info', 'source2target(%s): returning %s' % (source, target) ) return target ######################################################################## # Upload all files in a hierarchy to an IMAP server def recursive_import(): output('info', 'recursive_import(): starting') global config if not os.path.isdir(config['source']): output('crit', 'Argument must be a directory when using -r') output('crit', 'Given "%s")' % config['source']) sys.exit(1) source_list = build_file_list(config['source']) output('debug', 'recursive_import(): source_list = %s' % source_list) # Need to know what to strip off when creating targets on the # IMAP server # base = os.path.dirname(config['source']) base = base + '/' target_list = source_list # If the source directory was specified as a path with more that # one component, we need to strip it down to the last componend(the # containing directory) since that is what the hierarchy on the IMAP # server will be created relative to. # target_list = map(lambda x: re.sub('^' + base, '', x), target_list) target_list = map(source2target, target_list) output('debug', 'recursive_import(): target_list = %s' % target_list) create_mailboxes(target_list) for source in source_list: output('debug', 'recursive_import(): source = %s' % source) target = source output('debug', 'recursive_import(): target = %s' % target) output('debug', 'recursive_import(): base = %s' % base) target = re.sub('^' + base, '', target) output('debug', 'recursive_import(): target = %s' % target) target = source2target(target) output('debug', 'recursive_import(): target = %s' % target) output('debug', 'recursive_import(): source = %s' % source) output('debug', 'recursive_import(): target = %s' % target) output('notice', 'Starting import of %s to %s' % (source, target) ) do_import(source, target) output('notice', 'Finished import of %s to %s' % (source, target) ) output('info', 'recursive_import(): completed') ######################################################################## def single_import(): output('info', 'single_import(): starting') global config source = config['source'] target = config['mailbox'] output('notice', 'Starting import of %s to %s' % (source, target) ) do_import(source, target) output('notice', 'Finished import of %s to %s' % (source, target) ) output('info', 'single_import(): completed') ######################################################################## # Process command line options def process_options(): output('info', 'process_options(): starting') global config try: opts, args = getopt.getopt(sys.argv[1:], "i:rs:u:v:p:") except getopt.GetoptError: usage() sys.exit(1) for option, argument in opts: if option == '-m': config['mailbox'] = argument if option == '-r': config['recursive_mode'] = 1 if option == "-u": config['user'] = argument if option == "-v": config['verbose'] = argument # Make sure desired log level is stored as an integer config['verbose'] = numeric_log_level(config['verbose']) output('debug', 'process_options(): opts = %s' % opts) output('debug', 'process_options(): args = %s' % args) # Summarize config output('debug', "process_options(): config['mailbox'] = %s" % config['mailbox'] ) output('debug', "process_options(): config['user'] = %s" % config['user'] ) output('debug', "process_options(): config['verbose'] = %s" % config['verbose'] ) output('debug', "process_options(): config['recursive_mode'] = %s" % config['recursive_mode'] ) if len(args) == 0: usage() sys.exit() if len(args) != 1: output('crit', 'Too many file arguments: %s' % ' '.join(args)) output('crit', 'Expecting only one; aborting') sys.exit(1) config['source'] = args[0] output('info', 'process_options(): completed') ######################################################################## # Return true if file is in mbox format def is_mbox_file(file): output('info', 'is_mbox_file(%s): starting' % file) return open(file).readline()[0:5] == 'From ' ######################################################################## # Given a directory, return a list of contained mbox files def build_file_list(node): output('info', 'build_file_list(%s): starting' % node) file_children = [] directory_children = [] for entry in os.listdir(node): if os.path.isfile(node + '/' + entry): if is_mbox_file(node + '/' + entry): file_children.append(entry) elif os.path.isdir(node + '/' + entry): directory_children.append(entry) # Add containing directory to each entry flat = map(lambda x: node + '/' + x, file_children) # Recursively process directory children for entry in directory_children: flat.extend(build_file_list(node + '/' + entry)) return flat ######################################################################## def create_mailboxes(mailboxes): global config output('info', 'create_mailboxes(%s): starting' % mailboxes) # Attempting to create a mailbox that already exists produces an # IMAP protocol error, so we only want to attempt to create a # mailbox that does not exist. To do this, we need a list of the # current mailboxes. We can get that with the list() method of the # IMAP4_SSL object, but the output it returns is formatted in a # strange way: # # (\Noinferiors) "." "INBOX" # # We need to extract the string in the INBOX location. Use map() to # iterate over the list and pull out the folder name using a regular # expression. # extract = lambda x: re.search(r'^.*"\." "(.*)"', x).group(1) current_mailboxes = ('') # TODO 'zmmailbox -z -m %s gaf' % config['user'] current_mailboxes = map(extract, current_mailboxes) for mailbox in mailboxes: if not current_mailboxes.__contains__(mailbox): output('notice', 'Creating mailbox: ' + mailbox) #TODO 'zmmailbox -z -m %s cf %s' % (config['user'], mailbox) ######################################################################## # Take an integer or string log level and return an integer log level # def numeric_log_level(level): # If level is an integer between 0 and 7, pass it back if range(8).__contains__(level): return(level) if level == 'debug': return(7) if level == 'info': return(6) if level == 'notice': return(5) if level == 'warning': return(4) if level == 'err': return(3) if level == 'crit': return(2) if level == 'alert': return(1) if level == 'emerg': return(0) # crit, alert, emerg: critical error, immediate termination # err: non-fatal problem # warning: possibly negative informational message # notice: neutral informational... TODO # info: function calls, arguments # debug: protocol, data details output('warning', 'Unknown log level "%s", assuming "emerg"' % level) return(0) ######################################################################## # Take an integer or string log level and return a string log level # def string_log_level(level): string_levels = ['emerg', 'alert', 'crit', 'err', 'warning', 'notice', 'info', 'debug'] # If level is already a valid string, pass it back if string_levels.__contains__(level): return(level) # If level is a string between 0 and 7, return appropriate string if range(8).__contains__(level): return(string_levels[level]) output('warning', 'Unknown log level "%s", assuming "emerg"' % level) return('emerg') ######################################################################## def output(level, message): global config if numeric_log_level(level) <= config['verbose']: print "%s: (%s) %s" % (config['self'], string_log_level(level), message) ######################################################################## # TODO def usage(): global config print '''Usage: %s [OPTION]... FILE Import contents of mbox FILE to zimbra. -m MAILBOX when not using -r, import to MAILBOX (default: %s) -r recursively import mbox files (FILE must be a directory) -u USER authenticate as USER -v LEVEL set verbosity to LEVEL (syslog priority style) Note: "." characters are not allowed in mailbox names or directory names. Such characters will be converted to "_" on the server. When using -r, mailbox names will be derived from mbox file hierarchy structure. Warning: Please do not delete source mail until you have verified that it has been imported successfully. This tool has been written with safety in mind, but there are no guarantees. ''' % (config['self'], config['mailbox']) ######################################################################## def msgfactory(fp): try: return email.message_from_file(fp) except email.Errors.MessageParseError: # Don't return None since that will stop the mailbox iterator return '' ######################################################################## # Extract the subject from a string representing an email message def get_subject(msg_txt): output('info', 'get_subject(): starting') buffer = StringIO.StringIO(msg_txt) for line in buffer: if re.search(r'^Subject:', line): return line.rstrip() if line == '\n': # End of headers. If we reached here, there is no subject. output('warning', 'Message does not have a subject') return '' ######################################################################## def do_import(from_file, to_mailbox): global config output('info', 'do_import(%s, %s): starting' % (from_file, to_mailbox) ) fp = open(from_file, 'r') mbox = mailbox.UnixMailbox(fp, msgfactory) for msg_obj in mbox: msg_txt = msg_obj.as_string(unixfrom=False) msg_fp, msg_fname = tempfile.mkstemp() os.write(msg_fp, msg_txt) os.close(msg_fp) subject = get_subject(msg_txt) output('notice', 'Uploading message from %s: %s' % (msg_fname, subject)) os.system('zmmailbox -z -m %s am %s %s' % (config['user'], to_mailbox, msg_fname)) #os.delete(msg_fname) ######################################################################## main()