#!/usr/bin/env python3 """ Developed by Greig McGill of Sense7. This script is designed to poll an IMAP mailbox when run. It will find any emails with attachments, marking them as read, and saving the attachment to the 'attachments' directory for later processing. It will respect the attachment filetype and extension. Attachments are output named with the current date-time, and a semi-random uid designed to crudely prevent namespace collisions. In a VERY high traffic environment where many files may be created per second, this should be re-implemented to be more robust. No file locking is used, however files are written to a temporary directory first and renamed upon completion, as renaming is an atomic operation at an OS level. Attachments will not be created if they have an identical hash to a previously downloaded attachment. This is designed to prevent scenarios where the same file has been accidentally sent multiple times. Note that this identification is done based on file content, and the name of the file is irrelevant. Logging is fairly primitive and done to a log file in the same directory as the script. This could be upgraded to syslog-style logging if required. This is set up for simple IMAP SSL authentication using TLS with implied STARTTLS. If manual STARTTLS is required, the MailBox method will need to be altered to MailBoxTls. If Outlook or Gmail or similar are used, it will be necessary to implement OAUTH2. Authentication is configured in a .env file as described below in the code. """ # Standard libraries import os import sys import ssl import json import hashlib import logging import tempfile from os.path import join, dirname from datetime import datetime # Third party libraries from dotenv import load_dotenv from imap_tools import MailBox, AND # Initialise logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s', filename='./getmail.log', filemode='a') logging.debug('%a started in %s', 'getmail.py', os.getcwd()) # Load our environment vars from our .env file # This is just keys and values - eg MBOX_USER = 'test@someemail.com' # One key/value pair per line dotenv_path = join(dirname(__file__), ".env") try: with open(dotenv_path, 'r', encoding='utf-8') as env: pass load_dotenv(dotenv_path) except FileNotFoundError: logging.error('config file %a is missing - unable to proceed', dotenv_path) sys.exit(1) except Exception as e: logging.error('An unexpected error occurred: %s', str(e)) sys.exit(1) # Function to compute the hash of an attachment to determine uniqueness def compute_hash(content): """Function to generate a simple file hash. Could be more secure.""" return hashlib.sha256(content).hexdigest() # File to store hashes of saved attachments HASHES_FILE = 'saved_hashes.json' # Load saved hashes from the file if os.path.exists(HASHES_FILE): with open(HASHES_FILE, 'r', encoding='utf-8') as f: saved_hashes = set(json.load(f)) else: saved_hashes = set() # Set our constants from the environment USERNAME = os.environ.get("MBOX_USER") PASSWORD = os.environ.get("MBOX_PASS") DEFAULT_FOLDER = os.environ.get("MBOX_FOLDER", "Inbox") PORT = 993 if not USERNAME or not PASSWORD: logging.error('Missing mailbox username or password in environment') sys.exit(1) # Other constants ATTACHMENT_PATH = join(dirname(__file__), "attachments") HOST = "helpdesk.sense7.co.nz" if not os.path.exists(ATTACHMENT_PATH): os.makedirs(ATTACHMENT_PATH) # Set SSL context for "secure" (self-signed cert etc.) connection ssl_context = ssl.create_default_context() ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3 ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3 ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key") # Process mailbox with MailBox(HOST, port=PORT, ssl_context=ssl_context).login( USERNAME, PASSWORD, DEFAULT_FOLDER) as mailbox: for msg in mailbox.fetch(AND(seen=False), mark_seen=False): for att in msg.attachments: ATTACHMENT_HASH = compute_hash(att.payload) if ATTACHMENT_HASH not in saved_hashes: filename, file_ext = os.path.splitext(att.filename) current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") FINAL_NAME = ( f"{current_datetime}_" f"{ATTACHMENT_HASH[:8]}" f"{file_ext}" ) with tempfile.NamedTemporaryFile( delete=False, dir="temp") as temp_file: temp_file.write(att.payload) temp_path = temp_file.name final_path = os.path.join(ATTACHMENT_PATH, FINAL_NAME) os.rename(temp_path, final_path) # Atomic move logging.info("Saved attachment as: %s", final_path) saved_hashes.add(ATTACHMENT_HASH) else: logging.info("Skipped duplicate attachment: %s", att.filename) mailbox.flag(msg.uid, '\\Seen', True) # Save the updated hashes to the file with open(HASHES_FILE, 'w', encoding='utf-8') as f: json.dump(list(saved_hashes), f)