Initial checkin
This commit is contained in:
12
.gitignore
vendored
Normal file
12
.gitignore
vendored
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
.env
|
||||||
|
*.key
|
||||||
|
*.crt
|
||||||
|
saved_hashes.json
|
||||||
|
pyvenv.cfg
|
||||||
|
*.log
|
||||||
|
bin
|
||||||
|
lib
|
||||||
|
include
|
||||||
|
attachments
|
||||||
|
xml
|
||||||
|
temp
|
||||||
65
csv_to_xml.py
Executable file
65
csv_to_xml.py
Executable file
@@ -0,0 +1,65 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import csv
|
||||||
|
import lxml.etree as ET
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
# Define the DTD
|
||||||
|
dtd_content = """
|
||||||
|
<!ELEMENT items (item+)>
|
||||||
|
<!ELEMENT item (Item_ID, Item_Name, Item_Description, Item_Price, Item_Quantity)>
|
||||||
|
<!ELEMENT Item_ID (#PCDATA)>
|
||||||
|
<!ELEMENT Item_Name (#PCDATA)>
|
||||||
|
<!ELEMENT Item_Description (#PCDATA)>
|
||||||
|
<!ELEMENT Item_Price (#PCDATA)>
|
||||||
|
<!ELEMENT Item_Quantity (#PCDATA)>
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Save DTD to a file
|
||||||
|
dtd_file = "items.dtd"
|
||||||
|
with open(dtd_file, "w") as f:
|
||||||
|
f.write(dtd_content)
|
||||||
|
|
||||||
|
# Function to validate XML against the DTD
|
||||||
|
def validate_xml(xml_string, dtd_path):
|
||||||
|
dtd = ET.DTD(dtd_path)
|
||||||
|
# Convert the XML string to bytes to handle the encoding declaration
|
||||||
|
xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8')))
|
||||||
|
if dtd.validate(xml_doc):
|
||||||
|
print("XML is valid against the DTD.")
|
||||||
|
else:
|
||||||
|
print("XML validation failed:", dtd.error_log.filter_from_errors())
|
||||||
|
|
||||||
|
# Read CSV and generate XML
|
||||||
|
def csv_to_xml(csv_file, xml_file):
|
||||||
|
root = ET.Element("items")
|
||||||
|
|
||||||
|
with open(csv_file, newline='', encoding="utf-8") as csvfile:
|
||||||
|
reader = csv.DictReader(csvfile)
|
||||||
|
for row in reader:
|
||||||
|
item = ET.SubElement(root, "item")
|
||||||
|
ET.SubElement(item, "Item_ID").text = row["Item_ID"]
|
||||||
|
ET.SubElement(item, "Item_Name").text = row["Item_Name"]
|
||||||
|
ET.SubElement(item, "Item_Description").text = row["Item_Description"]
|
||||||
|
ET.SubElement(item, "Item_Price").text = row["Item_Price"]
|
||||||
|
ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"]
|
||||||
|
|
||||||
|
# Convert to string and validate
|
||||||
|
xml_string = ET.tostring(root, pretty_print=True, xml_declaration=True, encoding="UTF-8").decode("utf-8")
|
||||||
|
validate_xml(xml_string, dtd_file)
|
||||||
|
|
||||||
|
# Save XML to file
|
||||||
|
with open(xml_file, "w", encoding="utf-8") as f:
|
||||||
|
f.write(xml_string)
|
||||||
|
print(f"XML file saved to {xml_file}")
|
||||||
|
|
||||||
|
# Input and output file paths
|
||||||
|
csv_file = "items.csv" # Input CSV file
|
||||||
|
xml_file = "items.xml" # Output XML file
|
||||||
|
|
||||||
|
# Example CSV content for reference:
|
||||||
|
# Item_ID,Item_Name,Item_Description,Item_Price,Item_Quantity
|
||||||
|
# 1,Widget A,A useful widget,19.99,100
|
||||||
|
# 2,Gadget B,A versatile gadget,29.49,200
|
||||||
|
|
||||||
|
# Generate XML
|
||||||
|
csv_to_xml(csv_file, xml_file)
|
||||||
141
getmail.py
Executable file
141
getmail.py
Executable file
@@ -0,0 +1,141 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
""" Developed by Greig McGill of Sense7.
|
||||||
|
|
||||||
|
This script is designed to poll an IMAP mailbox when run.
|
||||||
|
It will find any emails with attachments, marking them as read, and saving
|
||||||
|
the attachment to the 'attachments' directory for later processing.
|
||||||
|
It will respect the attachment filetype and extension.
|
||||||
|
Attachments are output named with the current date-time, and a semi-random
|
||||||
|
uid designed to crudely prevent namespace collisions. In a VERY high traffic
|
||||||
|
environment where many files may be created per second, this should be
|
||||||
|
re-implemented to be more robust.
|
||||||
|
No file locking is used, however files are written to a temporary directory
|
||||||
|
first and renamed upon completion, as renaming is an atomic operation at an
|
||||||
|
OS level.
|
||||||
|
|
||||||
|
Attachments will not be created if they have an identical hash to a
|
||||||
|
previously downloaded attachment. This is designed to prevent scenarios where
|
||||||
|
the same file has been accidentally sent multiple times. Note that this
|
||||||
|
identification is done based on file content, and the name of the file is
|
||||||
|
irrelevant.
|
||||||
|
|
||||||
|
Logging is fairly primitive and done to a log file in the same directory as
|
||||||
|
the script. This could be upgraded to syslog-style logging if required.
|
||||||
|
|
||||||
|
This is set up for simple IMAP SSL authentication using TLS with implied
|
||||||
|
STARTTLS. If manual STARTTLS is required, the MailBox method will need to be
|
||||||
|
altered to MailBoxTls. If Outlook or Gmail or similar are used, it will be
|
||||||
|
necessary to implement OAUTH2.
|
||||||
|
|
||||||
|
Authentication is configured in a .env file as described below in the code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Standard libraries
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import ssl
|
||||||
|
import json
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
import tempfile
|
||||||
|
from os.path import join, dirname
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
# Third party libraries
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from imap_tools import MailBox, AND
|
||||||
|
|
||||||
|
# Initialise logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG,
|
||||||
|
format='%(asctime)s %(levelname)s %(message)s',
|
||||||
|
filename='./getmail.log',
|
||||||
|
filemode='a')
|
||||||
|
logging.debug('%a started in %s', 'getmail.py', os.getcwd())
|
||||||
|
|
||||||
|
# Load our environment vars from our .env file
|
||||||
|
# This is just keys and values - eg MBOX_USER = 'test@someemail.com'
|
||||||
|
# One key/value pair per line
|
||||||
|
dotenv_path = join(dirname(__file__), ".env")
|
||||||
|
try:
|
||||||
|
with open(dotenv_path, 'r', encoding='utf-8') as env:
|
||||||
|
pass
|
||||||
|
load_dotenv(dotenv_path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
logging.error('config file %a is missing - unable to proceed', dotenv_path)
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('An unexpected error occurred: %s', str(e))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Function to compute the hash of an attachment to determine uniqueness
|
||||||
|
def compute_hash(content):
|
||||||
|
"""Function to generate a simple file hash. Could be more secure."""
|
||||||
|
return hashlib.sha256(content).hexdigest()
|
||||||
|
|
||||||
|
# File to store hashes of saved attachments
|
||||||
|
HASHES_FILE = 'saved_hashes.json'
|
||||||
|
|
||||||
|
# Load saved hashes from the file
|
||||||
|
if os.path.exists(HASHES_FILE):
|
||||||
|
with open(HASHES_FILE, 'r', encoding='utf-8') as f:
|
||||||
|
saved_hashes = set(json.load(f))
|
||||||
|
else:
|
||||||
|
saved_hashes = set()
|
||||||
|
|
||||||
|
# Set our constants from the environment
|
||||||
|
USERNAME = os.environ.get("MBOX_USER")
|
||||||
|
PASSWORD = os.environ.get("MBOX_PASS")
|
||||||
|
DEFAULT_FOLDER = os.environ.get("MBOX_FOLDER", "Inbox")
|
||||||
|
PORT = 993
|
||||||
|
|
||||||
|
if not USERNAME or not PASSWORD:
|
||||||
|
logging.error('Missing mailbox username or password in environment')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Other constants
|
||||||
|
ATTACHMENT_PATH = join(dirname(__file__), "attachments")
|
||||||
|
HOST = "helpdesk.sense7.co.nz"
|
||||||
|
|
||||||
|
if not os.path.exists(ATTACHMENT_PATH):
|
||||||
|
os.makedirs(ATTACHMENT_PATH)
|
||||||
|
|
||||||
|
# Set SSL context for "secure" (self-signed cert etc.) connection
|
||||||
|
ssl_context = ssl.create_default_context()
|
||||||
|
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
|
||||||
|
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
|
||||||
|
ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key")
|
||||||
|
|
||||||
|
# Process mailbox
|
||||||
|
with MailBox(HOST, port=PORT, ssl_context=ssl_context).login(
|
||||||
|
USERNAME, PASSWORD, DEFAULT_FOLDER) as mailbox:
|
||||||
|
|
||||||
|
for msg in mailbox.fetch(AND(seen=False), mark_seen=False):
|
||||||
|
for att in msg.attachments:
|
||||||
|
ATTACHMENT_HASH = compute_hash(att.payload)
|
||||||
|
|
||||||
|
if ATTACHMENT_HASH not in saved_hashes:
|
||||||
|
filename, file_ext = os.path.splitext(att.filename)
|
||||||
|
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||||
|
FINAL_NAME = (
|
||||||
|
f"{current_datetime}_"
|
||||||
|
f"{ATTACHMENT_HASH[:8]}"
|
||||||
|
f"{file_ext}"
|
||||||
|
)
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(
|
||||||
|
delete=False, dir="temp") as temp_file:
|
||||||
|
temp_file.write(att.payload)
|
||||||
|
temp_path = temp_file.name
|
||||||
|
final_path = os.path.join(ATTACHMENT_PATH, FINAL_NAME)
|
||||||
|
os.rename(temp_path, final_path) # Atomic move
|
||||||
|
logging.info("Saved attachment as: %s", final_path)
|
||||||
|
saved_hashes.add(ATTACHMENT_HASH)
|
||||||
|
else:
|
||||||
|
logging.info("Skipped duplicate attachment: %s", att.filename)
|
||||||
|
mailbox.flag(msg.uid, '\\Seen', True)
|
||||||
|
|
||||||
|
# Save the updated hashes to the file
|
||||||
|
with open(HASHES_FILE, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(list(saved_hashes), f)
|
||||||
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
chardet>=5.2.0
|
||||||
|
imap-tools>=1.8.0
|
||||||
|
lxml>=5.3.0
|
||||||
|
python-dotenv>=1.0.1
|
||||||
Reference in New Issue
Block a user