diff --git a/csv_to_xml.py b/csv_to_xml.py deleted file mode 100755 index 26145d8..0000000 --- a/csv_to_xml.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 -""" -Developed by Greig McGill of Sense7 -""" -import os -import sys -import csv -import logging -from io import BytesIO -import lxml.etree as ET - -def setup_logging(): - """Initialize logging configuration.""" - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(levelname)s %(message)s', - filename='./csv_to_xml.log', - filemode='a') - logging.debug('%s started in %s', 'csv_to_xml.py', os.getcwd()) - -def validate_xml(xml_string, dtd_path): - """Validate the XML against the DTD""" - dtd = ET.DTD(dtd_path) - # Convert the XML string to bytes to handle the encoding declaration - xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8'))) - if dtd.validate(xml_doc): - logging.debug("XML is valid against the %s DTD.", dtd_path) - else: - logging.error("XML validation failed using DTD %s with error %s:", - dtd_path, dtd.error_log.filter_from_errors()) - -# Read CSV and generate XML -def csv_to_xml(csv_file, xml_file, dtd_file): - """Read the CSV and generate the XML""" - root = ET.Element("items") - - with open(csv_file, newline='', encoding="utf-8") as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - item = ET.SubElement(root, "item") - ET.SubElement(item, "Item_ID").text = row["Item_ID"] - ET.SubElement(item, "Item_Name").text = row["Item_Name"] - ET.SubElement(item, "Item_Description" - ).text = row["Item_Description"] - ET.SubElement(item, "Item_Price").text = row["Item_Price"] - ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"] - - # Convert to string and validate - xml_string = ET.tostring(root, pretty_print=True, - xml_declaration=True, encoding="UTF-8").decode("utf-8") - logging.info("Attmpting to validate %s against the %s DTD.", - xml_file, dtd_file) - validate_xml(xml_string, dtd_file) - - # Save XML to file - with open(xml_file, "w", encoding="utf-8") as f: - f.write(xml_string) - logging.info("XML file saved to %s", xml_file) - -def main(csv_file, xml_file, dtd_file): - """Code entrypoint.""" - setup_logging() - logging.debug("Attempting conversion of %s to %s using the %s DTD", - csv_file, xml_file, dtd_file) - csv_to_xml(csv_file, xml_file, dtd_file) - -if __name__ == "__main__": - main(sys.argv[1], sys.argv[2], sys.argv[3]) diff --git a/getmail.py b/getmail.py index 26acd7b..fd72412 100755 --- a/getmail.py +++ b/getmail.py @@ -35,17 +35,21 @@ Authentication is configured in a .env file as described below in the code. # Standard libraries import os import sys +import csv import ssl import json +import time import hashlib import logging import tempfile +from io import BytesIO from os.path import join, dirname from datetime import datetime # Third party libraries from dotenv import load_dotenv from imap_tools import MailBox, AND +import lxml.etree as ET def setup_logging(): """Initialize logging configuration.""" @@ -94,7 +98,50 @@ def setup_ssl_context(): ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key") return ssl_context -def process_attachments(mailbox, attachment_path, saved_hashes): +def validate_xml(xml_string, dtd_path): + """Validate the XML against the DTD""" + dtd = ET.DTD(dtd_path) + # Convert the XML string to bytes to handle the encoding declaration + xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8'))) + if dtd.validate(xml_doc): + logging.debug("XML is valid against the %s DTD.", dtd_path) + else: + logging.error("XML validation failed using DTD %s with error %s:", + dtd_path, dtd.error_log.filter_from_errors()) + +def csv_to_xml(csv_file, xml_file, dtd_file): + """Read the CSV and generate the XML""" + root = ET.Element("items") + logging.debug("CSV file size: %d bytes", os.path.getsize(csv_file)) + logging.debug("Resolved CSV file path: %s", os.path.abspath(csv_file)) + + with open(csv_file, newline='', encoding="utf-8") as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + item = ET.SubElement(root, "item") + ET.SubElement(item, "Item_ID").text = row["Item_ID"] + ET.SubElement(item, "Item_Name").text = row["Item_Name"] + ET.SubElement(item, "Item_Description" + ).text = row["Item_Description"] + ET.SubElement(item, "Item_Price").text = row["Item_Price"] + ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"] + + # Convert to string and validate + try: + xml_string = ET.tostring(root, pretty_print=True, + xml_declaration=True, encoding="UTF-8").decode("utf-8") + validate_xml(xml_string, dtd_file) + except Exception as e: + logging.error("Error during XML processing: %s", str(e)) + raise + + # Save XML to file + with open(xml_file, "w", encoding="utf-8") as f: + f.write(xml_string) + logging.info("XML file saved to %s", xml_file) + +def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs, + dtd_name): """Process and save email attachments.""" for msg in mailbox.fetch(AND(seen=False), mark_seen=False): for att in msg.attachments: @@ -109,15 +156,25 @@ def process_attachments(mailbox, attachment_path, saved_hashes): f"{attachment_hash[:8]}" f"{file_ext}" ) + xml_name = ( + f"{current_datetime}_" + f"{attachment_hash[:8]}" + f".xml" + ) with tempfile.NamedTemporaryFile( delete=False, dir="temp") as temp_file: temp_file.write(att.payload) + temp_file.flush() # Flush the write buffer to disk + os.fsync(temp_file.fileno()) temp_path = temp_file.name final_path = os.path.join(attachment_path, final_name) + xml_path = os.path.join(xml_docs, xml_name) + dtd_path = os.path.join(xml_docs, dtd_name) os.rename(temp_path, final_path) # Atomic move logging.info("Saved attachment as: %s", final_path) saved_hashes.add(attachment_hash) + csv_to_xml(final_path, xml_path, dtd_path) else: logging.info("Skipped duplicate attachment: %s", att.filename) mailbox.flag(msg.uid, '\\Seen', True) @@ -135,23 +192,37 @@ def main(): default_folder = os.environ.get("MBOX_FOLDER", "Inbox") host = os.environ.get("MAIL_HOST") port = os.environ.get("MAIL_PORT") + dtd_name = os.environ.get("DTD") if not username or not password: - logging.error('Missing mailbox username or password in environment') + logging.error('Missing mail username and/or password in environment') sys.exit(1) + if not host or not port: + logging.error('Missing mail hostname and/or port in environment') + sys.exit(2) + + if not dtd_name: + logging.error('No document type definition set for XML validation') + sys.exit(3) + attachment_path = join(dirname(__file__), "attachments") temp_path = join(dirname(__file__), "temp") + xml_docs = join(dirname(__file__), "xml") + if not os.path.exists(attachment_path): os.makedirs(attachment_path) if not os.path.exists(temp_path): os.makedirs(temp_path) + if not os.path.exists(xml_docs): + os.makedirs(xml_docs) ssl_context = setup_ssl_context() with MailBox(host, port=port, ssl_context=ssl_context).login( username, password, default_folder) as mailbox: - process_attachments(mailbox, attachment_path, saved_hashes) + process_attachments(mailbox, attachment_path, saved_hashes, xml_docs, + dtd_name) save_hashes(hashes_file, saved_hashes)