From fc86c4c8a015dfcf92b15f01f41843e3ca734539 Mon Sep 17 00:00:00 2001 From: Greig McGill Date: Mon, 30 Dec 2024 08:17:02 +1300 Subject: [PATCH] Initial checkin --- .gitignore | 12 ++++ csv_to_xml.py | 65 ++++++++++++++++++++++ getmail.py | 141 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 ++ 4 files changed, 222 insertions(+) create mode 100644 .gitignore create mode 100755 csv_to_xml.py create mode 100755 getmail.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c23b470 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +.env +*.key +*.crt +saved_hashes.json +pyvenv.cfg +*.log +bin +lib +include +attachments +xml +temp diff --git a/csv_to_xml.py b/csv_to_xml.py new file mode 100755 index 0000000..b6f97c4 --- /dev/null +++ b/csv_to_xml.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import csv +import lxml.etree as ET +from io import BytesIO + +# Define the DTD +dtd_content = """ + + + + + + + +""" + +# Save DTD to a file +dtd_file = "items.dtd" +with open(dtd_file, "w") as f: + f.write(dtd_content) + +# Function to validate XML against the DTD +def validate_xml(xml_string, dtd_path): + dtd = ET.DTD(dtd_path) + # Convert the XML string to bytes to handle the encoding declaration + xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8'))) + if dtd.validate(xml_doc): + print("XML is valid against the DTD.") + else: + print("XML validation failed:", dtd.error_log.filter_from_errors()) + +# Read CSV and generate XML +def csv_to_xml(csv_file, xml_file): + root = ET.Element("items") + + with open(csv_file, newline='', encoding="utf-8") as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + item = ET.SubElement(root, "item") + ET.SubElement(item, "Item_ID").text = row["Item_ID"] + ET.SubElement(item, "Item_Name").text = row["Item_Name"] + ET.SubElement(item, "Item_Description").text = row["Item_Description"] + ET.SubElement(item, "Item_Price").text = row["Item_Price"] + ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"] + + # Convert to string and validate + xml_string = ET.tostring(root, pretty_print=True, xml_declaration=True, encoding="UTF-8").decode("utf-8") + validate_xml(xml_string, dtd_file) + + # Save XML to file + with open(xml_file, "w", encoding="utf-8") as f: + f.write(xml_string) + print(f"XML file saved to {xml_file}") + +# Input and output file paths +csv_file = "items.csv" # Input CSV file +xml_file = "items.xml" # Output XML file + +# Example CSV content for reference: +# Item_ID,Item_Name,Item_Description,Item_Price,Item_Quantity +# 1,Widget A,A useful widget,19.99,100 +# 2,Gadget B,A versatile gadget,29.49,200 + +# Generate XML +csv_to_xml(csv_file, xml_file) diff --git a/getmail.py b/getmail.py new file mode 100755 index 0000000..2518e08 --- /dev/null +++ b/getmail.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 + +""" Developed by Greig McGill of Sense7. + +This script is designed to poll an IMAP mailbox when run. +It will find any emails with attachments, marking them as read, and saving +the attachment to the 'attachments' directory for later processing. +It will respect the attachment filetype and extension. +Attachments are output named with the current date-time, and a semi-random +uid designed to crudely prevent namespace collisions. In a VERY high traffic +environment where many files may be created per second, this should be +re-implemented to be more robust. +No file locking is used, however files are written to a temporary directory +first and renamed upon completion, as renaming is an atomic operation at an +OS level. + +Attachments will not be created if they have an identical hash to a +previously downloaded attachment. This is designed to prevent scenarios where +the same file has been accidentally sent multiple times. Note that this +identification is done based on file content, and the name of the file is +irrelevant. + +Logging is fairly primitive and done to a log file in the same directory as +the script. This could be upgraded to syslog-style logging if required. + +This is set up for simple IMAP SSL authentication using TLS with implied +STARTTLS. If manual STARTTLS is required, the MailBox method will need to be +altered to MailBoxTls. If Outlook or Gmail or similar are used, it will be +necessary to implement OAUTH2. + +Authentication is configured in a .env file as described below in the code. +""" + +# Standard libraries +import os +import sys +import ssl +import json +import hashlib +import logging +import tempfile +from os.path import join, dirname +from datetime import datetime + +# Third party libraries +from dotenv import load_dotenv +from imap_tools import MailBox, AND + +# Initialise logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(levelname)s %(message)s', + filename='./getmail.log', + filemode='a') +logging.debug('%a started in %s', 'getmail.py', os.getcwd()) + +# Load our environment vars from our .env file +# This is just keys and values - eg MBOX_USER = 'test@someemail.com' +# One key/value pair per line +dotenv_path = join(dirname(__file__), ".env") +try: + with open(dotenv_path, 'r', encoding='utf-8') as env: + pass + load_dotenv(dotenv_path) +except FileNotFoundError: + logging.error('config file %a is missing - unable to proceed', dotenv_path) + sys.exit(1) +except Exception as e: + logging.error('An unexpected error occurred: %s', str(e)) + sys.exit(1) + +# Function to compute the hash of an attachment to determine uniqueness +def compute_hash(content): + """Function to generate a simple file hash. Could be more secure.""" + return hashlib.sha256(content).hexdigest() + +# File to store hashes of saved attachments +HASHES_FILE = 'saved_hashes.json' + +# Load saved hashes from the file +if os.path.exists(HASHES_FILE): + with open(HASHES_FILE, 'r', encoding='utf-8') as f: + saved_hashes = set(json.load(f)) +else: + saved_hashes = set() + +# Set our constants from the environment +USERNAME = os.environ.get("MBOX_USER") +PASSWORD = os.environ.get("MBOX_PASS") +DEFAULT_FOLDER = os.environ.get("MBOX_FOLDER", "Inbox") +PORT = 993 + +if not USERNAME or not PASSWORD: + logging.error('Missing mailbox username or password in environment') + sys.exit(1) + +# Other constants +ATTACHMENT_PATH = join(dirname(__file__), "attachments") +HOST = "helpdesk.sense7.co.nz" + +if not os.path.exists(ATTACHMENT_PATH): + os.makedirs(ATTACHMENT_PATH) + +# Set SSL context for "secure" (self-signed cert etc.) connection +ssl_context = ssl.create_default_context() +ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3 +ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3 +ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key") + +# Process mailbox +with MailBox(HOST, port=PORT, ssl_context=ssl_context).login( + USERNAME, PASSWORD, DEFAULT_FOLDER) as mailbox: + + for msg in mailbox.fetch(AND(seen=False), mark_seen=False): + for att in msg.attachments: + ATTACHMENT_HASH = compute_hash(att.payload) + + if ATTACHMENT_HASH not in saved_hashes: + filename, file_ext = os.path.splitext(att.filename) + current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + FINAL_NAME = ( + f"{current_datetime}_" + f"{ATTACHMENT_HASH[:8]}" + f"{file_ext}" + ) + + with tempfile.NamedTemporaryFile( + delete=False, dir="temp") as temp_file: + temp_file.write(att.payload) + temp_path = temp_file.name + final_path = os.path.join(ATTACHMENT_PATH, FINAL_NAME) + os.rename(temp_path, final_path) # Atomic move + logging.info("Saved attachment as: %s", final_path) + saved_hashes.add(ATTACHMENT_HASH) + else: + logging.info("Skipped duplicate attachment: %s", att.filename) + mailbox.flag(msg.uid, '\\Seen', True) + +# Save the updated hashes to the file +with open(HASHES_FILE, 'w', encoding='utf-8') as f: + json.dump(list(saved_hashes), f) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..200dd37 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +chardet>=5.2.0 +imap-tools>=1.8.0 +lxml>=5.3.0 +python-dotenv>=1.0.1