Code refactor to a single script complete.

This commit is contained in:
2025-01-02 12:09:02 +13:00
parent c4cc9a49d8
commit 3ef171e637
2 changed files with 74 additions and 71 deletions

View File

@@ -1,68 +0,0 @@
#!/usr/bin/env python3
"""
Developed by Greig McGill of Sense7
"""
import os
import sys
import csv
import logging
from io import BytesIO
import lxml.etree as ET
def setup_logging():
"""Initialize logging configuration."""
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s',
filename='./csv_to_xml.log',
filemode='a')
logging.debug('%s started in %s', 'csv_to_xml.py', os.getcwd())
def validate_xml(xml_string, dtd_path):
"""Validate the XML against the DTD"""
dtd = ET.DTD(dtd_path)
# Convert the XML string to bytes to handle the encoding declaration
xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8')))
if dtd.validate(xml_doc):
logging.debug("XML is valid against the %s DTD.", dtd_path)
else:
logging.error("XML validation failed using DTD %s with error %s:",
dtd_path, dtd.error_log.filter_from_errors())
# Read CSV and generate XML
def csv_to_xml(csv_file, xml_file, dtd_file):
"""Read the CSV and generate the XML"""
root = ET.Element("items")
with open(csv_file, newline='', encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
item = ET.SubElement(root, "item")
ET.SubElement(item, "Item_ID").text = row["Item_ID"]
ET.SubElement(item, "Item_Name").text = row["Item_Name"]
ET.SubElement(item, "Item_Description"
).text = row["Item_Description"]
ET.SubElement(item, "Item_Price").text = row["Item_Price"]
ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"]
# Convert to string and validate
xml_string = ET.tostring(root, pretty_print=True,
xml_declaration=True, encoding="UTF-8").decode("utf-8")
logging.info("Attmpting to validate %s against the %s DTD.",
xml_file, dtd_file)
validate_xml(xml_string, dtd_file)
# Save XML to file
with open(xml_file, "w", encoding="utf-8") as f:
f.write(xml_string)
logging.info("XML file saved to %s", xml_file)
def main(csv_file, xml_file, dtd_file):
"""Code entrypoint."""
setup_logging()
logging.debug("Attempting conversion of %s to %s using the %s DTD",
csv_file, xml_file, dtd_file)
csv_to_xml(csv_file, xml_file, dtd_file)
if __name__ == "__main__":
main(sys.argv[1], sys.argv[2], sys.argv[3])

View File

@@ -35,17 +35,21 @@ Authentication is configured in a .env file as described below in the code.
# Standard libraries
import os
import sys
import csv
import ssl
import json
import time
import hashlib
import logging
import tempfile
from io import BytesIO
from os.path import join, dirname
from datetime import datetime
# Third party libraries
from dotenv import load_dotenv
from imap_tools import MailBox, AND
import lxml.etree as ET
def setup_logging():
"""Initialize logging configuration."""
@@ -94,7 +98,50 @@ def setup_ssl_context():
ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key")
return ssl_context
def process_attachments(mailbox, attachment_path, saved_hashes):
def validate_xml(xml_string, dtd_path):
"""Validate the XML against the DTD"""
dtd = ET.DTD(dtd_path)
# Convert the XML string to bytes to handle the encoding declaration
xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8')))
if dtd.validate(xml_doc):
logging.debug("XML is valid against the %s DTD.", dtd_path)
else:
logging.error("XML validation failed using DTD %s with error %s:",
dtd_path, dtd.error_log.filter_from_errors())
def csv_to_xml(csv_file, xml_file, dtd_file):
"""Read the CSV and generate the XML"""
root = ET.Element("items")
logging.debug("CSV file size: %d bytes", os.path.getsize(csv_file))
logging.debug("Resolved CSV file path: %s", os.path.abspath(csv_file))
with open(csv_file, newline='', encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
item = ET.SubElement(root, "item")
ET.SubElement(item, "Item_ID").text = row["Item_ID"]
ET.SubElement(item, "Item_Name").text = row["Item_Name"]
ET.SubElement(item, "Item_Description"
).text = row["Item_Description"]
ET.SubElement(item, "Item_Price").text = row["Item_Price"]
ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"]
# Convert to string and validate
try:
xml_string = ET.tostring(root, pretty_print=True,
xml_declaration=True, encoding="UTF-8").decode("utf-8")
validate_xml(xml_string, dtd_file)
except Exception as e:
logging.error("Error during XML processing: %s", str(e))
raise
# Save XML to file
with open(xml_file, "w", encoding="utf-8") as f:
f.write(xml_string)
logging.info("XML file saved to %s", xml_file)
def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name):
"""Process and save email attachments."""
for msg in mailbox.fetch(AND(seen=False), mark_seen=False):
for att in msg.attachments:
@@ -109,15 +156,25 @@ def process_attachments(mailbox, attachment_path, saved_hashes):
f"{attachment_hash[:8]}"
f"{file_ext}"
)
xml_name = (
f"{current_datetime}_"
f"{attachment_hash[:8]}"
f".xml"
)
with tempfile.NamedTemporaryFile(
delete=False, dir="temp") as temp_file:
temp_file.write(att.payload)
temp_file.flush() # Flush the write buffer to disk
os.fsync(temp_file.fileno())
temp_path = temp_file.name
final_path = os.path.join(attachment_path, final_name)
xml_path = os.path.join(xml_docs, xml_name)
dtd_path = os.path.join(xml_docs, dtd_name)
os.rename(temp_path, final_path) # Atomic move
logging.info("Saved attachment as: %s", final_path)
saved_hashes.add(attachment_hash)
csv_to_xml(final_path, xml_path, dtd_path)
else:
logging.info("Skipped duplicate attachment: %s", att.filename)
mailbox.flag(msg.uid, '\\Seen', True)
@@ -135,23 +192,37 @@ def main():
default_folder = os.environ.get("MBOX_FOLDER", "Inbox")
host = os.environ.get("MAIL_HOST")
port = os.environ.get("MAIL_PORT")
dtd_name = os.environ.get("DTD")
if not username or not password:
logging.error('Missing mailbox username or password in environment')
logging.error('Missing mail username and/or password in environment')
sys.exit(1)
if not host or not port:
logging.error('Missing mail hostname and/or port in environment')
sys.exit(2)
if not dtd_name:
logging.error('No document type definition set for XML validation')
sys.exit(3)
attachment_path = join(dirname(__file__), "attachments")
temp_path = join(dirname(__file__), "temp")
xml_docs = join(dirname(__file__), "xml")
if not os.path.exists(attachment_path):
os.makedirs(attachment_path)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
if not os.path.exists(xml_docs):
os.makedirs(xml_docs)
ssl_context = setup_ssl_context()
with MailBox(host, port=port, ssl_context=ssl_context).login(
username, password, default_folder) as mailbox:
process_attachments(mailbox, attachment_path, saved_hashes)
process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name)
save_hashes(hashes_file, saved_hashes)