Refactored csv_to_email.py and added basic logging - much more work needed
This commit is contained in:
@@ -1,36 +1,37 @@
|
||||
#!/usr/bin/env python3
|
||||
import csv
|
||||
import lxml.etree as ET
|
||||
from io import BytesIO
|
||||
|
||||
# Define the DTD
|
||||
dtd_content = """
|
||||
<!ELEMENT items (item+)>
|
||||
<!ELEMENT item (Item_ID, Item_Name, Item_Description, Item_Price, Item_Quantity)>
|
||||
<!ELEMENT Item_ID (#PCDATA)>
|
||||
<!ELEMENT Item_Name (#PCDATA)>
|
||||
<!ELEMENT Item_Description (#PCDATA)>
|
||||
<!ELEMENT Item_Price (#PCDATA)>
|
||||
<!ELEMENT Item_Quantity (#PCDATA)>
|
||||
"""
|
||||
Developed by Greig McGill of Sense7
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import csv
|
||||
import logging
|
||||
from io import BytesIO
|
||||
import lxml.etree as ET
|
||||
|
||||
# Save DTD to a file
|
||||
dtd_file = "items.dtd"
|
||||
with open(dtd_file, "w") as f:
|
||||
f.write(dtd_content)
|
||||
def setup_logging():
|
||||
"""Initialize logging configuration."""
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s %(levelname)s %(message)s',
|
||||
filename='./csv_to_xml.log',
|
||||
filemode='a')
|
||||
logging.debug('%s started in %s', 'csv_to_xml.py', os.getcwd())
|
||||
|
||||
# Function to validate XML against the DTD
|
||||
def validate_xml(xml_string, dtd_path):
|
||||
"""Validate the XML against the DTD"""
|
||||
dtd = ET.DTD(dtd_path)
|
||||
# Convert the XML string to bytes to handle the encoding declaration
|
||||
xml_doc = ET.parse(BytesIO(xml_string.encode('utf-8')))
|
||||
if dtd.validate(xml_doc):
|
||||
print("XML is valid against the DTD.")
|
||||
logging.debug("XML is valid against the %s DTD.", dtd_path)
|
||||
else:
|
||||
print("XML validation failed:", dtd.error_log.filter_from_errors())
|
||||
logging.error("XML validation failed using DTD %s with error %s:",
|
||||
dtd_path, dtd.error_log.filter_from_errors())
|
||||
|
||||
# Read CSV and generate XML
|
||||
def csv_to_xml(csv_file, xml_file):
|
||||
def csv_to_xml(csv_file, xml_file, dtd_file):
|
||||
"""Read the CSV and generate the XML"""
|
||||
root = ET.Element("items")
|
||||
|
||||
with open(csv_file, newline='', encoding="utf-8") as csvfile:
|
||||
@@ -44,22 +45,23 @@ def csv_to_xml(csv_file, xml_file):
|
||||
ET.SubElement(item, "Item_Quantity").text = row["Item_Quantity"]
|
||||
|
||||
# Convert to string and validate
|
||||
xml_string = ET.tostring(root, pretty_print=True, xml_declaration=True, encoding="UTF-8").decode("utf-8")
|
||||
xml_string = ET.tostring(root, pretty_print=True,
|
||||
xml_declaration=True, encoding="UTF-8").decode("utf-8")
|
||||
logging.info("Attmpting to validate %s against the %s DTD.",
|
||||
xml_file, dtd_file)
|
||||
validate_xml(xml_string, dtd_file)
|
||||
|
||||
# Save XML to file
|
||||
with open(xml_file, "w", encoding="utf-8") as f:
|
||||
f.write(xml_string)
|
||||
print(f"XML file saved to {xml_file}")
|
||||
logging.info("XML file saved to %s", xml_file)
|
||||
|
||||
# Input and output file paths
|
||||
csv_file = "items.csv" # Input CSV file
|
||||
xml_file = "items.xml" # Output XML file
|
||||
def main(csv_file, xml_file, dtd_file):
|
||||
"""Code entrypoint."""
|
||||
setup_logging()
|
||||
logging.debug("Attempting conversion of %s to %s using the %s DTD",
|
||||
csv_file, xml_file, dtd_file)
|
||||
csv_to_xml(csv_file, xml_file, dtd_file)
|
||||
|
||||
# Example CSV content for reference:
|
||||
# Item_ID,Item_Name,Item_Description,Item_Price,Item_Quantity
|
||||
# 1,Widget A,A useful widget,19.99,100
|
||||
# 2,Gadget B,A versatile gadget,29.49,200
|
||||
|
||||
# Generate XML
|
||||
csv_to_xml(csv_file, xml_file)
|
||||
if __name__ == "__main__":
|
||||
main(sys.argv[1], sys.argv[2], sys.argv[3])
|
||||
|
||||
@@ -110,7 +110,8 @@ def process_attachments(mailbox, attachment_path, saved_hashes):
|
||||
f"{file_ext}"
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(delete=False, dir="temp") as temp_file:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
delete=False, dir="temp") as temp_file:
|
||||
temp_file.write(att.payload)
|
||||
temp_path = temp_file.name
|
||||
final_path = os.path.join(attachment_path, final_name)
|
||||
@@ -140,8 +141,11 @@ def main():
|
||||
sys.exit(1)
|
||||
|
||||
attachment_path = join(dirname(__file__), "attachments")
|
||||
temp_path = join(dirname(__file__), "temp")
|
||||
if not os.path.exists(attachment_path):
|
||||
os.makedirs(attachment_path)
|
||||
if not os.path.exists(temp_path):
|
||||
os.makedirs(temp_path)
|
||||
|
||||
ssl_context = setup_ssl_context()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user