Added custom configured client CSV mappings based on email address domain

This commit is contained in:
2025-01-02 17:24:51 +13:00
parent 3ef171e637
commit 8df3377709

View File

@@ -65,7 +65,7 @@ def load_environment_variables():
dotenv_path = join(dirname(__file__), ".env")
try:
with open(dotenv_path, 'r', encoding='utf-8') as env:
logging.debug('success opening env file: %a', env)
logging.debug('success opening env file: %a', env.name)
load_dotenv(dotenv_path)
except FileNotFoundError:
logging.error('config file %a is missing - unable to proceed', dotenv_path)
@@ -109,22 +109,21 @@ def validate_xml(xml_string, dtd_path):
logging.error("XML validation failed using DTD %s with error %s:",
dtd_path, dtd.error_log.filter_from_errors())
def csv_to_xml(csv_file, xml_file, dtd_file):
def csv_to_xml(csv_file, xml_file, dtd_file, csv_mapping):
"""Read the CSV and generate the XML"""
root = ET.Element("items")
logging.debug("CSV file size: %d bytes", os.path.getsize(csv_file))
logging.debug("Resolved CSV file path: %s", os.path.abspath(csv_file))
logging.debug("CSV file size: %d bytes", os.path.getsize(csv_file))
with open(csv_file, newline='', encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
item = ET.SubElement(root, "item")
ET.SubElement(item, "Item_ID").text = row["Item_ID"]
ET.SubElement(item, "Item_Name").text = row["Item_Name"]
ET.SubElement(item, "Item_Description"
).text = row["Item_Description"]
ET.SubElement(item, "Item_Price").text = row["Item_Price"]
ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"]
for xml_tag, csv_column in csv_mapping.items():
if csv_column in row:
ET.SubElement(item, xml_tag).text = row[csv_column]
else:
logging.error("Missing column %s in CSV file", csv_column)
# Convert to string and validate
try:
@@ -140,6 +139,12 @@ def csv_to_xml(csv_file, xml_file, dtd_file):
f.write(xml_string)
logging.info("XML file saved to %s", xml_file)
def get_client_mapping(client_domain):
with open("column_mapping.json", "r", encoding="utf-8") as f:
column_mapping = json.load(f)
mapping = column_mapping.get(client_domain, column_mapping["default"])
return mapping
def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
dtd_name):
"""Process and save email attachments."""
@@ -149,18 +154,19 @@ def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
if attachment_hash not in saved_hashes:
filename, file_ext = os.path.splitext(att.filename)
logging.debug('found new attachment named %a', filename)
logging.info('new attachment named %s from %s',
filename, msg.from_)
client_domain = msg.from_.split('@')[1]
csv_mapping=get_client_mapping(client_domain)
logging.debug('loaded client %s csv mapping %s',
client_domain, csv_mapping)
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
final_name = (
f"{current_datetime}_"
f"{attachment_hash[:8]}"
f"{file_ext}"
)
xml_name = (
f"{current_datetime}_"
f"{attachment_hash[:8]}"
f".xml"
)
xml_name = final_name.split('.')[0] + '.xml'
with tempfile.NamedTemporaryFile(
delete=False, dir="temp") as temp_file:
@@ -174,10 +180,10 @@ def process_attachments(mailbox, attachment_path, saved_hashes, xml_docs,
os.rename(temp_path, final_path) # Atomic move
logging.info("Saved attachment as: %s", final_path)
saved_hashes.add(attachment_hash)
csv_to_xml(final_path, xml_path, dtd_path)
csv_to_xml(final_path, xml_path, dtd_path, csv_mapping)
else:
logging.info("Skipped duplicate attachment: %s", att.filename)
mailbox.flag(msg.uid, '\\Seen', True)
#mailbox.flag(msg.uid, '\\Seen', True)
def main():
"""Code entrypoint."""