Files
email_to_xml/getmail.py
2025-01-03 06:52:46 +13:00

214 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Developed by Greig McGill of Sense7.
See README.md for full documentation and version history.
"""
# Standard libraries
import os
import sys
import csv
import ssl
import json
import hashlib
import logging
import tempfile
from io import BytesIO
from os.path import join, dirname
from datetime import datetime
# Third party libraries
from dotenv import load_dotenv
from imap_tools import MailBox, AND
import lxml.etree as ET
def setup_logging():
"""Initialize logging configuration."""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s',
filename='./getmail.log',
filemode='a')
logging.debug('%a started in %s', 'getmail.py', os.getcwd())
def load_environment_variables():
"""Load environment variables from .env file."""
dotenv_path = join(dirname(__file__), ".env")
try:
with open(dotenv_path, 'r', encoding='utf-8') as env:
logging.debug('success opening env file: %a', env.name)
load_dotenv(dotenv_path)
except FileNotFoundError:
logging.error('config file %a is missing - unable to proceed', dotenv_path)
sys.exit(1)
except Exception as e:
logging.error('An unexpected error occurred: %s', str(e))
sys.exit(1)
def compute_hash(content):
"""Generate a simple file hash to determine uniqueness."""
return hashlib.sha256(content).hexdigest()
def load_saved_hashes(hashes_file):
"""Load saved hashes from the file."""
if os.path.exists(hashes_file):
with open(hashes_file, 'r', encoding='utf-8') as f:
return set(json.load(f))
return set()
def save_hashes(hashes_file, hashes):
"""Save updated hashes to the file."""
with open(hashes_file, 'w', encoding='utf-8') as f:
json.dump(list(hashes), f)
def setup_ssl_context():
"""Set up the SSL context for mail connection."""
ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key")
return ssl_context
def validate_xml(xml_string, dtd_path):
"""Validate the XML against the DTD"""
dtd = ET.DTD(dtd_path)
# Convert the XML string to bytes to handle the encoding declaration
xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8')))
if dtd.validate(xml_doc):
logging.debug("XML is valid against the %s DTD.", dtd_path)
else:
logging.error("XML validation failed using DTD %s with error %s:",
dtd_path, dtd.error_log.filter_from_errors())
def csv_to_xml(csv_file, xml_file, dtd_file, csv_mapping):
"""Read the CSV and generate the XML"""
root = ET.Element("items")
logging.debug("Resolved CSV file path: %s", os.path.abspath(csv_file))
logging.debug("CSV file size: %d bytes", os.path.getsize(csv_file))
with open(csv_file, newline='', encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
item = ET.SubElement(root, "item")
for xml_tag, csv_column in csv_mapping.items():
if csv_column in row:
ET.SubElement(item, xml_tag).text = row[csv_column]
else:
logging.error("Missing column %s in CSV file", csv_column)
# Convert to string and validate
try:
xml_string = ET.tostring(root, pretty_print=True,
xml_declaration=True, encoding="UTF-8").decode("utf-8")
validate_xml(xml_string, dtd_file)
except Exception as e:
logging.error("Error during XML processing: %s", str(e))
raise
# Save XML to file
with open(xml_file, "w", encoding="utf-8") as f:
f.write(xml_string)
logging.info("XML file saved to %s", xml_file)
def get_client_mapping(client_domain):
"""Check custom mapping for the client CSV files, returning the custom
mapping if one is found."""
with open("column_mapping.json", "r", encoding="utf-8") as f:
column_mapping = json.load(f)
mapping = column_mapping.get(client_domain, column_mapping["default"])
return mapping
def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name):
"""Process and save email attachments.
For each attachment found, save it, marking the email as read, and saving
a unique hash so we never process the same attachment again."""
for msg in mailbox.fetch(AND(seen=False), mark_seen=False):
for att in msg.attachments:
attachment_hash = compute_hash(att.payload)
if attachment_hash not in saved_hashes:
filename, file_ext = os.path.splitext(att.filename)
logging.info('new attachment named %s from %s',
filename, msg.from_)
client_domain = msg.from_.split('@', maxsplit=1)[1]
csv_mapping=get_client_mapping(client_domain)
logging.debug('loaded client %s csv mapping %s',
client_domain, csv_mapping)
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
final_name = (
f"{current_datetime}_"
f"{attachment_hash[:8]}"
f"{file_ext}"
)
xml_name = final_name.split('.', maxsplit=1)[0] + '.xml'
with tempfile.NamedTemporaryFile(
delete=False, dir="temp") as temp_file:
temp_file.write(att.payload)
temp_file.flush() # Flush the write buffer to disk
os.fsync(temp_file.fileno())
temp_path = temp_file.name
final_path = os.path.join(attachment_path, final_name)
xml_path = os.path.join(xml_docs, xml_name)
dtd_path = os.path.join(xml_docs, dtd_name)
os.rename(temp_path, final_path) # Atomic move
logging.info("Saved attachment as: %s", final_path)
saved_hashes.add(attachment_hash)
csv_to_xml(final_path, xml_path, dtd_path, csv_mapping)
else:
logging.info("Skipped duplicate attachment: %s", att.filename)
mailbox.flag(msg.uid, '\\Seen', True)
def main():
"""Initialise logging, load environment, and get to work!"""
setup_logging()
load_environment_variables()
hashes_file = 'saved_hashes.json'
saved_hashes = load_saved_hashes(hashes_file)
username = os.environ.get("MBOX_USER")
password = os.environ.get("MBOX_PASS")
default_folder = os.environ.get("MBOX_FOLDER", "Inbox")
host = os.environ.get("MAIL_HOST")
port = os.environ.get("MAIL_PORT")
dtd_name = os.environ.get("DTD")
if not username or not password:
logging.error('Missing mail username and/or password in environment')
sys.exit(1)
if not host or not port:
logging.error('Missing mail hostname and/or port in environment')
sys.exit(2)
if not dtd_name:
logging.error('No document type definition set for XML validation')
sys.exit(3)
attachment_path = join(dirname(__file__), "attachments")
temp_path = join(dirname(__file__), "temp")
xml_docs = join(dirname(__file__), "xml")
if not os.path.exists(attachment_path):
os.makedirs(attachment_path)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
if not os.path.exists(xml_docs):
os.makedirs(xml_docs)
ssl_context = setup_ssl_context()
with MailBox(host, port=port, ssl_context=ssl_context).login(
username, password, default_folder) as mailbox:
process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name)
save_hashes(hashes_file, saved_hashes)
if __name__ == "__main__":
main()