refactor config / parser loading

This commit is contained in:
Keenan Tims 2025-04-24 16:04:34 -07:00
parent 90bf08484a
commit 336b732c60

View File

@ -1,9 +1,11 @@
#!/usr/bin/env python
import asyncio
import email.policy
import importlib.util
import logging
import os
import ssl
from dataclasses import dataclass
from email.message import EmailMessage
from imaplib import IMAP4
from logging import debug
@ -14,19 +16,12 @@ from os import getenv
from pprint import pprint
from typing import cast
from config import PARSERS
from model import Transaction
from parsers import TransactionParsingFailed
IMAP_SERVER = getenv("IMAP_SERVER")
IMAP_PORT = int(getenv("IMAP_PORT", 143))
IMAP_USER = getenv("IMAP_USER")
IMAP_PASS = getenv("IMAP_PASS")
IMAP_MAILBOX = getenv("IMAP_MAILBOX", "INBOX")
IMAP_INTERVAL = float(getenv("IMAP_INTERVAL", 300))
CONFIG_PATH = getenv("CONFIG_PATH", "/data/config.py")
ACTUAL_PATH = "./cli.js"
TIMEOUT = 30
@ -36,75 +31,110 @@ async def ticker(interval: float):
await asyncio.sleep(interval)
async def submit_transaction(t: Transaction):
cmd = (
ACTUAL_PATH
+ f' -a "{t.account}"'
+ f' -p "{t.payee}"'
+ f' -m "{t.amount}"'
+ f' -d "{t.date}"'
f' -n "{t.notes}"'
)
debug("Actual command: %s", cmd)
proc = await asyncio.create_subprocess_shell(
cmd=cmd,
env=os.environ,
stderr=asyncio.subprocess.STDOUT,
stdout=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error("Submitting to actual failed: %s", stdout)
def load_config_module(path: str):
spec = importlib.util.spec_from_file_location("config", path)
config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config)
return config
async def process_message(msg_b: bytes):
debug("parsing message")
msg = cast(
EmailMessage, email.message_from_bytes(msg_b, policy=email.policy.default)
)
pprint(msg)
info(
"Found message from %s to %s subject %s",
msg.get("From", "<unknown>"),
msg.get("To", "<unknown>"),
msg.get("Subject", "<no subject>"),
)
@dataclass
class AppConfig:
@staticmethod
def from_environ() -> "AppConfig":
return AppConfig(
imap_server=getenv("IMAP_SERVER"),
imap_port=int(getenv("IMAP_PORT", 143)),
imap_user=getenv("IMAP_USER"),
imap_pass=getenv("IMAP_PASS"),
imap_mailbox=getenv("IMAP_MAILBOX", "INBOX"),
imap_interval=float(getenv("IMAP_INTERVAL", 300)),
imap_starttls=bool(getenv("IMAP_STARTTLS", True)),
)
for parser in PARSERS:
debug("Running parser %s", type(parser).__name__)
try:
if parser.match(msg):
info("Parser %s claimed message", type(parser).__name__)
trans = parser.extract(msg)
info("Submitting transaction to Actual: %s", trans)
await submit_transaction(trans)
except TransactionParsingFailed as e:
warning("Unable to parse message %s", e)
except Exception as e:
warning("Unexpected exception %s", e)
imap_server: str
imap_user: str
imap_pass: str
imap_port: int = 143
imap_mailbox: str = "INBOX"
imap_interval: int = 300
imap_starttls: bool = True
async def poll_imap():
info("polling mailbox")
with IMAP4(IMAP_SERVER, IMAP_PORT, 30) as M:
context = ssl.create_default_context()
M.starttls(context)
M.login(IMAP_USER, IMAP_PASS)
M.select(IMAP_MAILBOX)
status, m_set = M.search(None, "UNSEEN")
class App:
def __init__(self, app_config: AppConfig, parser_config_path: str):
config_mod = load_config_module(parser_config_path)
self._parsers = config_mod.PARSERS
self._config = app_config
for msg_id in m_set[0].split():
debug("Retrieving msg id %s", msg_id)
status, msg = M.fetch(msg_id, "(RFC822)")
await process_message(msg[0][1])
async def submit_transaction(self, t: Transaction):
cmd = (
ACTUAL_PATH
+ f' -a "{t.account}"'
+ f' -p "{t.payee}"'
+ f' -m "{t.amount}"'
+ f' -d "{t.date}"'
f' -n "{t.notes}"'
)
debug("Actual command: %s", cmd)
proc = await asyncio.create_subprocess_shell(
cmd=cmd,
env=os.environ,
stderr=asyncio.subprocess.STDOUT,
stdout=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error("Submitting to actual failed: %s", stdout)
async def process_message(self, msg_b: bytes):
debug("parsing message")
msg = cast(
EmailMessage, email.message_from_bytes(msg_b, policy=email.policy.default)
)
pprint(msg)
info(
"Found message from %s to %s subject %s",
msg.get("From", "<unknown>"),
msg.get("To", "<unknown>"),
msg.get("Subject", "<no subject>"),
)
async def app():
async for tick in ticker(IMAP_INTERVAL):
await poll_imap()
for parser in self._parsers:
debug("Running parser %s", type(parser).__name__)
try:
if parser.match(msg):
info("Parser %s claimed message", type(parser).__name__)
trans = parser.extract(msg)
info("Submitting transaction to Actual: %s", trans)
await self.submit_transaction(trans)
except TransactionParsingFailed as e:
warning("Unable to parse message %s", e)
except Exception as e:
warning("Unexpected exception %s", e)
async def poll_imap(self):
info("polling mailbox")
with IMAP4(self._config.imap_server, self._config.imap_port, TIMEOUT) as M:
if self._config.imap_starttls:
context = ssl.create_default_context()
M.starttls(context)
M.login(self._config.imap_user, self._config.imap_pass)
M.select(self._config.imap_mailbox)
status, m_set = M.search(None, "UNSEEN")
for msg_id in m_set[0].split():
debug("Retrieving msg id %s", msg_id)
status, msg = M.fetch(msg_id, "(RFC822)")
await self.process_message(msg[0][1])
async def run(self):
async for tick in ticker(self._config.imap_interval):
await self.poll_imap()
if __name__ == "__main__":
logging.basicConfig(level=getenv("LOG_LEVEL", "INFO"))
debug("Parsers: %s", PARSERS)
asyncio.run(app())
app_config = AppConfig.from_environ()
app = App(app_config, CONFIG_PATH)
asyncio.run(app.run())