#!/usr/bin/env python import asyncio import email.policy import importlib.util import logging import os import ssl from dataclasses import dataclass from imaplib import IMAP4 from logging import debug, error, info, warning from os import getenv from model import Transaction from parsers import TransactionParsingFailed CONFIG_PATH = getenv("CONFIG_PATH", "/data/config.py") ACTUAL_PATH = "./cli.js" TIMEOUT = 30 async def ticker(interval: float): while True: yield await asyncio.sleep(interval) def load_config_module(path: str): spec = importlib.util.spec_from_file_location("config", path) if spec is None or spec.loader is None: raise ImportError(f"Could not find config module at {path}") config = importlib.util.module_from_spec(spec) spec.loader.exec_module(config) return config @dataclass class AppConfig: @staticmethod def from_environ() -> "AppConfig": return AppConfig( imap_server=os.environ["IMAP_SERVER"], imap_port=int(getenv("IMAP_PORT", 143)), imap_user=os.environ["IMAP_USER"], imap_pass=os.environ["IMAP_PASS"], imap_mailbox=getenv("IMAP_MAILBOX", "INBOX"), imap_interval=int(getenv("IMAP_INTERVAL", 300)), imap_starttls=bool(getenv("IMAP_STARTTLS", True)), ) imap_server: str imap_user: str imap_pass: str imap_port: int = 143 imap_mailbox: str = "INBOX" imap_interval: int = 300 imap_starttls: bool = True class App: def __init__(self, app_config: AppConfig, parser_config_path: str): config_mod = load_config_module(parser_config_path) self._parsers = config_mod.PARSERS self._config = app_config async def submit_transaction(self, t: Transaction): cmd = ( ACTUAL_PATH + f' -a "{t.account}"' + (f' -p "{t.payee}"' if t.payee else "") + f' -m "{t.amount}"' + f' -d "{t.date}"' + (f' -n "{t.notes}"' if t.notes else "") ) debug("Actual command: %s", cmd) proc = await asyncio.create_subprocess_shell( cmd=cmd, env=os.environ, stderr=asyncio.subprocess.STDOUT, stdout=asyncio.subprocess.PIPE, ) stdout, _stderr = await proc.communicate() if proc.returncode != 0: error("Submitting to actual failed: %s", stdout) async def process_message(self, msg_b: bytes): debug("parsing message") msg = email.message_from_bytes(msg_b, policy=email.policy.default) info( "Found message from %s to %s subject %s", msg.get("From", ""), msg.get("To", ""), msg.get("Subject", ""), ) for parser in self._parsers: debug("Running parser %s", type(parser).__name__) try: if parser.match(msg): info("Parser %s claimed message", type(parser).__name__) trans = parser.extract(msg) if trans is not None: info("Submitting transaction to Actual: %s", trans) await self.submit_transaction(trans) else: warning("Parser %s returned None", type(parser).__name__) except TransactionParsingFailed as e: warning("Unable to parse message %s", e) except Exception as e: warning("Unexpected exception %s", e) async def poll_imap(self): info("polling mailbox") with IMAP4(self._config.imap_server, self._config.imap_port, TIMEOUT) as M: if self._config.imap_starttls: context = ssl.create_default_context() M.starttls(context) M.login(self._config.imap_user, self._config.imap_pass) M.select(self._config.imap_mailbox) _status, m_set = M.search(None, "UNSEEN") for msg_id in m_set[0].split(): debug("Retrieving msg id %s", msg_id) _status, msg = M.fetch(msg_id, "(RFC822)") if _status != "OK" or msg[0] is None: error("Unable to fetch message %s", msg_id) continue msg_body = msg[0][1] if isinstance(msg_body, int): error("Unable to fetch message %s", msg_id) continue debug("Processing message %s", msg_id) await self.process_message(msg_body) async def run(self): async for _ in ticker(self._config.imap_interval): await self.poll_imap() if __name__ == "__main__": logging.basicConfig(level=getenv("LOG_LEVEL", "INFO")) app_config = AppConfig.from_environ() app = App(app_config, CONFIG_PATH) asyncio.run(app.run())