Refactored getmail.py into functions

This commit is contained in:
2024-12-31 08:13:13 +13:00
parent 7200a85c00
commit 0ab33b9bad

View File

@@ -47,96 +47,109 @@ from datetime import datetime
from dotenv import load_dotenv from dotenv import load_dotenv
from imap_tools import MailBox, AND from imap_tools import MailBox, AND
# Initialise logging def setup_logging():
logging.basicConfig( """Initialize logging configuration."""
level=logging.DEBUG, logging.basicConfig(
format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG,
filename='./getmail.log', format='%(asctime)s %(levelname)s %(message)s',
filemode='a') filename='./getmail.log',
logging.debug('%a started in %s', 'getmail.py', os.getcwd()) filemode='a')
logging.debug('%a started in %s', 'getmail.py', os.getcwd())
# Load our environment vars from our .env file def load_environment_variables():
# This is just keys and values - eg MBOX_USER = 'test@someemail.com' """Load environment variables from .env file."""
# One key/value pair per line dotenv_path = join(dirname(__file__), ".env")
dotenv_path = join(dirname(__file__), ".env") try:
try: with open(dotenv_path, 'r', encoding='utf-8') as env:
with open(dotenv_path, 'r', encoding='utf-8') as env: logging.debug('success opening env file: %a', env)
pass load_dotenv(dotenv_path)
load_dotenv(dotenv_path) except FileNotFoundError:
except FileNotFoundError: logging.error('config file %a is missing - unable to proceed', dotenv_path)
logging.error('config file %a is missing - unable to proceed', dotenv_path) sys.exit(1)
sys.exit(1) except Exception as e:
except Exception as e: logging.error('An unexpected error occurred: %s', str(e))
logging.error('An unexpected error occurred: %s', str(e)) sys.exit(1)
sys.exit(1)
# Function to compute the hash of an attachment to determine uniqueness
def compute_hash(content): def compute_hash(content):
"""Function to generate a simple file hash. Could be more secure.""" """Generate a simple file hash to determine uniqueness."""
return hashlib.sha256(content).hexdigest() return hashlib.sha256(content).hexdigest()
# File to store hashes of saved attachments def load_saved_hashes(hashes_file):
HASHES_FILE = 'saved_hashes.json' """Load saved hashes from the file."""
if os.path.exists(hashes_file):
with open(hashes_file, 'r', encoding='utf-8') as f:
return set(json.load(f))
return set()
# Load saved hashes from the file def save_hashes(hashes_file, hashes):
if os.path.exists(HASHES_FILE): """Save updated hashes to the file."""
with open(HASHES_FILE, 'r', encoding='utf-8') as f: with open(hashes_file, 'w', encoding='utf-8') as f:
saved_hashes = set(json.load(f)) json.dump(list(hashes), f)
else:
saved_hashes = set()
# Set our constants from the environment def setup_ssl_context():
USERNAME = os.environ.get("MBOX_USER") """Set up the SSL context."""
PASSWORD = os.environ.get("MBOX_PASS") ssl_context = ssl.create_default_context()
DEFAULT_FOLDER = os.environ.get("MBOX_FOLDER", "Inbox") ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
PORT = 993 ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key")
if not USERNAME or not PASSWORD: return ssl_context
logging.error('Missing mailbox username or password in environment')
sys.exit(1)
# Other constants
ATTACHMENT_PATH = join(dirname(__file__), "attachments")
HOST = "helpdesk.sense7.co.nz"
if not os.path.exists(ATTACHMENT_PATH):
os.makedirs(ATTACHMENT_PATH)
# Set SSL context for "secure" (self-signed cert etc.) connection
ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
ssl_context.load_cert_chain(certfile="./one.crt", keyfile="./one.key")
# Process mailbox
with MailBox(HOST, port=PORT, ssl_context=ssl_context).login(
USERNAME, PASSWORD, DEFAULT_FOLDER) as mailbox:
def process_attachments(mailbox, attachment_path, saved_hashes):
"""Process and save email attachments."""
for msg in mailbox.fetch(AND(seen=False), mark_seen=False): for msg in mailbox.fetch(AND(seen=False), mark_seen=False):
for att in msg.attachments: for att in msg.attachments:
ATTACHMENT_HASH = compute_hash(att.payload) attachment_hash = compute_hash(att.payload)
if ATTACHMENT_HASH not in saved_hashes: if attachment_hash not in saved_hashes:
filename, file_ext = os.path.splitext(att.filename) filename, file_ext = os.path.splitext(att.filename)
logging.debug('found new attachment named %a', filename)
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
FINAL_NAME = ( final_name = (
f"{current_datetime}_" f"{current_datetime}_"
f"{ATTACHMENT_HASH[:8]}" f"{attachment_hash[:8]}"
f"{file_ext}" f"{file_ext}"
) )
with tempfile.NamedTemporaryFile( with tempfile.NamedTemporaryFile(delete=False, dir="temp") as temp_file:
delete=False, dir="temp") as temp_file:
temp_file.write(att.payload) temp_file.write(att.payload)
temp_path = temp_file.name temp_path = temp_file.name
final_path = os.path.join(ATTACHMENT_PATH, FINAL_NAME) final_path = os.path.join(attachment_path, final_name)
os.rename(temp_path, final_path) # Atomic move os.rename(temp_path, final_path) # Atomic move
logging.info("Saved attachment as: %s", final_path) logging.info("Saved attachment as: %s", final_path)
saved_hashes.add(ATTACHMENT_HASH) saved_hashes.add(attachment_hash)
else: else:
logging.info("Skipped duplicate attachment: %s", att.filename) logging.info("Skipped duplicate attachment: %s", att.filename)
mailbox.flag(msg.uid, '\\Seen', True) mailbox.flag(msg.uid, '\\Seen', True)
# Save the updated hashes to the file def main():
with open(HASHES_FILE, 'w', encoding='utf-8') as f: """Code entrypoint."""
json.dump(list(saved_hashes), f) setup_logging()
load_environment_variables()
hashes_file = 'saved_hashes.json'
saved_hashes = load_saved_hashes(hashes_file)
username = os.environ.get("MBOX_USER")
password = os.environ.get("MBOX_PASS")
default_folder = os.environ.get("MBOX_FOLDER", "Inbox")
host = os.environ.get("MAIL_HOST")
port = os.environ.get("MAIL_PORT")
if not username or not password:
logging.error('Missing mailbox username or password in environment')
sys.exit(1)
attachment_path = join(dirname(__file__), "attachments")
if not os.path.exists(attachment_path):
os.makedirs(attachment_path)
ssl_context = setup_ssl_context()
with MailBox(host, port=port, ssl_context=ssl_context).login(
username, password, default_folder) as mailbox:
process_attachments(mailbox, attachment_path, saved_hashes)
save_hashes(hashes_file, saved_hashes)
if __name__ == "__main__":
main()