diff --git a/watcher.py b/watcher.py index 1360c42..3e9cccc 100644 --- a/watcher.py +++ b/watcher.py @@ -1,9 +1,11 @@ #!/usr/bin/env python import asyncio import email.policy +import importlib.util import logging import os import ssl +from dataclasses import dataclass from email.message import EmailMessage from imaplib import IMAP4 from logging import debug @@ -14,19 +16,12 @@ from os import getenv from pprint import pprint from typing import cast -from config import PARSERS from model import Transaction from parsers import TransactionParsingFailed -IMAP_SERVER = getenv("IMAP_SERVER") -IMAP_PORT = int(getenv("IMAP_PORT", 143)) -IMAP_USER = getenv("IMAP_USER") -IMAP_PASS = getenv("IMAP_PASS") -IMAP_MAILBOX = getenv("IMAP_MAILBOX", "INBOX") -IMAP_INTERVAL = float(getenv("IMAP_INTERVAL", 300)) +CONFIG_PATH = getenv("CONFIG_PATH", "/data/config.py") ACTUAL_PATH = "./cli.js" - TIMEOUT = 30 @@ -36,75 +31,110 @@ async def ticker(interval: float): await asyncio.sleep(interval) -async def submit_transaction(t: Transaction): - cmd = ( - ACTUAL_PATH - + f' -a "{t.account}"' - + f' -p "{t.payee}"' - + f' -m "{t.amount}"' - + f' -d "{t.date}"' - f' -n "{t.notes}"' - ) - debug("Actual command: %s", cmd) - proc = await asyncio.create_subprocess_shell( - cmd=cmd, - env=os.environ, - stderr=asyncio.subprocess.STDOUT, - stdout=asyncio.subprocess.PIPE, - ) - stdout, stderr = await proc.communicate() - if proc.returncode != 0: - error("Submitting to actual failed: %s", stdout) +def load_config_module(path: str): + spec = importlib.util.spec_from_file_location("config", path) + config = importlib.util.module_from_spec(spec) + spec.loader.exec_module(config) + return config -async def process_message(msg_b: bytes): - debug("parsing message") - msg = cast( - EmailMessage, email.message_from_bytes(msg_b, policy=email.policy.default) - ) - pprint(msg) - info( - "Found message from %s to %s subject %s", - msg.get("From", ""), - msg.get("To", ""), - msg.get("Subject", ""), - ) +@dataclass +class AppConfig: + @staticmethod + def from_environ() -> "AppConfig": + return AppConfig( + imap_server=getenv("IMAP_SERVER"), + imap_port=int(getenv("IMAP_PORT", 143)), + imap_user=getenv("IMAP_USER"), + imap_pass=getenv("IMAP_PASS"), + imap_mailbox=getenv("IMAP_MAILBOX", "INBOX"), + imap_interval=float(getenv("IMAP_INTERVAL", 300)), + imap_starttls=bool(getenv("IMAP_STARTTLS", True)), + ) - for parser in PARSERS: - debug("Running parser %s", type(parser).__name__) - try: - if parser.match(msg): - info("Parser %s claimed message", type(parser).__name__) - trans = parser.extract(msg) - info("Submitting transaction to Actual: %s", trans) - await submit_transaction(trans) - except TransactionParsingFailed as e: - warning("Unable to parse message %s", e) - except Exception as e: - warning("Unexpected exception %s", e) + imap_server: str + imap_user: str + imap_pass: str + imap_port: int = 143 + imap_mailbox: str = "INBOX" + imap_interval: int = 300 + imap_starttls: bool = True -async def poll_imap(): - info("polling mailbox") - with IMAP4(IMAP_SERVER, IMAP_PORT, 30) as M: - context = ssl.create_default_context() - M.starttls(context) - M.login(IMAP_USER, IMAP_PASS) - M.select(IMAP_MAILBOX) - status, m_set = M.search(None, "UNSEEN") +class App: + def __init__(self, app_config: AppConfig, parser_config_path: str): + config_mod = load_config_module(parser_config_path) + self._parsers = config_mod.PARSERS + self._config = app_config - for msg_id in m_set[0].split(): - debug("Retrieving msg id %s", msg_id) - status, msg = M.fetch(msg_id, "(RFC822)") - await process_message(msg[0][1]) + async def submit_transaction(self, t: Transaction): + cmd = ( + ACTUAL_PATH + + f' -a "{t.account}"' + + f' -p "{t.payee}"' + + f' -m "{t.amount}"' + + f' -d "{t.date}"' + f' -n "{t.notes}"' + ) + debug("Actual command: %s", cmd) + proc = await asyncio.create_subprocess_shell( + cmd=cmd, + env=os.environ, + stderr=asyncio.subprocess.STDOUT, + stdout=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + error("Submitting to actual failed: %s", stdout) + async def process_message(self, msg_b: bytes): + debug("parsing message") + msg = cast( + EmailMessage, email.message_from_bytes(msg_b, policy=email.policy.default) + ) + pprint(msg) + info( + "Found message from %s to %s subject %s", + msg.get("From", ""), + msg.get("To", ""), + msg.get("Subject", ""), + ) -async def app(): - async for tick in ticker(IMAP_INTERVAL): - await poll_imap() + for parser in self._parsers: + debug("Running parser %s", type(parser).__name__) + try: + if parser.match(msg): + info("Parser %s claimed message", type(parser).__name__) + trans = parser.extract(msg) + info("Submitting transaction to Actual: %s", trans) + await self.submit_transaction(trans) + except TransactionParsingFailed as e: + warning("Unable to parse message %s", e) + except Exception as e: + warning("Unexpected exception %s", e) + + async def poll_imap(self): + info("polling mailbox") + with IMAP4(self._config.imap_server, self._config.imap_port, TIMEOUT) as M: + if self._config.imap_starttls: + context = ssl.create_default_context() + M.starttls(context) + M.login(self._config.imap_user, self._config.imap_pass) + M.select(self._config.imap_mailbox) + status, m_set = M.search(None, "UNSEEN") + + for msg_id in m_set[0].split(): + debug("Retrieving msg id %s", msg_id) + status, msg = M.fetch(msg_id, "(RFC822)") + await self.process_message(msg[0][1]) + + async def run(self): + async for tick in ticker(self._config.imap_interval): + await self.poll_imap() if __name__ == "__main__": logging.basicConfig(level=getenv("LOG_LEVEL", "INFO")) - debug("Parsers: %s", PARSERS) - asyncio.run(app()) + app_config = AppConfig.from_environ() + app = App(app_config, CONFIG_PATH) + asyncio.run(app.run())