Files
email_to_xml/getmail.py

237 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Developed by Greig McGill of Sense7.
This script is designed to poll an IMAP mailbox when run.
It will find any emails with attachments, marking them as read, and saving
the attachment to the 'attachments' directory for later processing.
It will respect the attachment filetype and extension.
Attachments are output named with the current date-time, and a semi-random
uid designed to crudely prevent namespace collisions. In a VERY high traffic
environment where many files may be created per second, this should be
re-implemented to be more robust.
No file locking is used, however files are written to a temporary directory
first and renamed upon completion, as renaming is an atomic operation at an
OS level.
Attachments will not be created if they have an identical hash to a
previously downloaded attachment. This is designed to prevent scenarios where
the same file has been accidentally sent multiple times. Note that this
identification is done based on file content, and the name of the file is
irrelevant.
Logging is fairly primitive and done to a log file in the same directory as
the script. This could be upgraded to syslog-style logging if required.
This is set up for simple IMAP SSL authentication using TLS with implied
STARTTLS. If manual STARTTLS is required, the MailBox method will need to be
altered to MailBoxTls. If Outlook or Gmail or similar are used, it will be
necessary to implement OAUTH2.
Authentication is configured in a .env file as described below in the code.
"""
# Standard libraries
import os
import sys
import csv
import ssl
import json
import time
import hashlib
import logging
import tempfile
from io import BytesIO
from os.path import join, dirname
from datetime import datetime
# Third party libraries
from dotenv import load_dotenv
from imap_tools import MailBox, AND
import lxml.etree as ET
def setup_logging():
"""Initialize logging configuration."""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s',
filename='./getmail.log',
filemode='a')
logging.debug('%a started in %s', 'getmail.py', os.getcwd())
def load_environment_variables():
"""Load environment variables from .env file."""
dotenv_path = join(dirname(__file__), ".env")
try:
with open(dotenv_path, 'r', encoding='utf-8') as env:
logging.debug('success opening env file: %a', env.name)
load_dotenv(dotenv_path)
except FileNotFoundError:
logging.error('config file %a is missing - unable to proceed', dotenv_path)
sys.exit(1)
except Exception as e:
logging.error('An unexpected error occurred: %s', str(e))
sys.exit(1)
def compute_hash(content):
"""Generate a simple file hash to determine uniqueness."""
return hashlib.sha256(content).hexdigest()
def load_saved_hashes(hashes_file):
"""Load saved hashes from the file."""
if os.path.exists(hashes_file):
with open(hashes_file, 'r', encoding='utf-8') as f:
return set(json.load(f))
return set()
def save_hashes(hashes_file, hashes):
"""Save updated hashes to the file."""
with open(hashes_file, 'w', encoding='utf-8') as f:
json.dump(list(hashes), f)
def setup_ssl_context():
"""Set up the SSL context."""
ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key")
return ssl_context
def validate_xml(xml_string, dtd_path):
"""Validate the XML against the DTD"""
dtd = ET.DTD(dtd_path)
# Convert the XML string to bytes to handle the encoding declaration
xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8')))
if dtd.validate(xml_doc):
logging.debug("XML is valid against the %s DTD.", dtd_path)
else:
logging.error("XML validation failed using DTD %s with error %s:",
dtd_path, dtd.error_log.filter_from_errors())
def csv_to_xml(csv_file, xml_file, dtd_file, csv_mapping):
"""Read the CSV and generate the XML"""
root = ET.Element("items")
logging.debug("Resolved CSV file path: %s", os.path.abspath(csv_file))
logging.debug("CSV file size: %d bytes", os.path.getsize(csv_file))
with open(csv_file, newline='', encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
item = ET.SubElement(root, "item")
for xml_tag, csv_column in csv_mapping.items():
if csv_column in row:
ET.SubElement(item, xml_tag).text = row[csv_column]
else:
logging.error("Missing column %s in CSV file", csv_column)
# Convert to string and validate
try:
xml_string = ET.tostring(root, pretty_print=True,
xml_declaration=True, encoding="UTF-8").decode("utf-8")
validate_xml(xml_string, dtd_file)
except Exception as e:
logging.error("Error during XML processing: %s", str(e))
raise
# Save XML to file
with open(xml_file, "w", encoding="utf-8") as f:
f.write(xml_string)
logging.info("XML file saved to %s", xml_file)
def get_client_mapping(client_domain):
with open("column_mapping.json", "r", encoding="utf-8") as f:
column_mapping = json.load(f)
mapping = column_mapping.get(client_domain, column_mapping["default"])
return mapping
def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name):
"""Process and save email attachments."""
for msg in mailbox.fetch(AND(seen=False), mark_seen=False):
for att in msg.attachments:
attachment_hash = compute_hash(att.payload)
if attachment_hash not in saved_hashes:
filename, file_ext = os.path.splitext(att.filename)
logging.info('new attachment named %s from %s',
filename, msg.from_)
client_domain = msg.from_.split('@')[1]
csv_mapping=get_client_mapping(client_domain)
logging.debug('loaded client %s csv mapping %s',
client_domain, csv_mapping)
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
final_name = (
f"{current_datetime}_"
f"{attachment_hash[:8]}"
f"{file_ext}"
)
xml_name = final_name.split('.')[0] + '.xml'
with tempfile.NamedTemporaryFile(
delete=False, dir="temp") as temp_file:
temp_file.write(att.payload)
temp_file.flush() # Flush the write buffer to disk
os.fsync(temp_file.fileno())
temp_path = temp_file.name
final_path = os.path.join(attachment_path, final_name)
xml_path = os.path.join(xml_docs, xml_name)
dtd_path = os.path.join(xml_docs, dtd_name)
os.rename(temp_path, final_path) # Atomic move
logging.info("Saved attachment as: %s", final_path)
saved_hashes.add(attachment_hash)
csv_to_xml(final_path, xml_path, dtd_path, csv_mapping)
else:
logging.info("Skipped duplicate attachment: %s", att.filename)
#mailbox.flag(msg.uid, '\\Seen', True)
def main():
"""Code entrypoint."""
setup_logging()
load_environment_variables()
hashes_file = 'saved_hashes.json'
saved_hashes = load_saved_hashes(hashes_file)
username = os.environ.get("MBOX_USER")
password = os.environ.get("MBOX_PASS")
default_folder = os.environ.get("MBOX_FOLDER", "Inbox")
host = os.environ.get("MAIL_HOST")
port = os.environ.get("MAIL_PORT")
dtd_name = os.environ.get("DTD")
if not username or not password:
logging.error('Missing mail username and/or password in environment')
sys.exit(1)
if not host or not port:
logging.error('Missing mail hostname and/or port in environment')
sys.exit(2)
if not dtd_name:
logging.error('No document type definition set for XML validation')
sys.exit(3)
attachment_path = join(dirname(__file__), "attachments")
temp_path = join(dirname(__file__), "temp")
xml_docs = join(dirname(__file__), "xml")
if not os.path.exists(attachment_path):
os.makedirs(attachment_path)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
if not os.path.exists(xml_docs):
os.makedirs(xml_docs)
ssl_context = setup_ssl_context()
with MailBox(host, port=port, ssl_context=ssl_context).login(
username, password, default_folder) as mailbox:
process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name)
save_hashes(hashes_file, saved_hashes)
if __name__ == "__main__":
main()