144 lines
4.8 KiB
Python
144 lines
4.8 KiB
Python
#!/usr/bin/env python
|
|
import asyncio
|
|
import email.policy
|
|
import importlib.util
|
|
import logging
|
|
import os
|
|
import ssl
|
|
from dataclasses import dataclass
|
|
from imaplib import IMAP4
|
|
from logging import debug, error, info, warning
|
|
from os import getenv
|
|
|
|
from model import Transaction
|
|
from parsers import TransactionParsingFailed
|
|
|
|
CONFIG_PATH = getenv("CONFIG_PATH", "/data/config.py")
|
|
ACTUAL_PATH = "./cli.js"
|
|
TIMEOUT = 30
|
|
|
|
|
|
async def ticker(interval: float):
|
|
while True:
|
|
yield
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
def load_config_module(path: str):
|
|
spec = importlib.util.spec_from_file_location("config", path)
|
|
if spec is None or spec.loader is None:
|
|
raise ImportError(f"Could not find config module at {path}")
|
|
config = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(config)
|
|
return config
|
|
|
|
|
|
@dataclass
|
|
class AppConfig:
|
|
@staticmethod
|
|
def from_environ() -> "AppConfig":
|
|
return AppConfig(
|
|
imap_server=os.environ["IMAP_SERVER"],
|
|
imap_port=int(getenv("IMAP_PORT", 143)),
|
|
imap_user=os.environ["IMAP_USER"],
|
|
imap_pass=os.environ["IMAP_PASS"],
|
|
imap_mailbox=getenv("IMAP_MAILBOX", "INBOX"),
|
|
imap_interval=int(getenv("IMAP_INTERVAL", 300)),
|
|
imap_starttls=bool(getenv("IMAP_STARTTLS", True)),
|
|
)
|
|
|
|
imap_server: str
|
|
imap_user: str
|
|
imap_pass: str
|
|
imap_port: int = 143
|
|
imap_mailbox: str = "INBOX"
|
|
imap_interval: int = 300
|
|
imap_starttls: bool = True
|
|
|
|
|
|
class App:
|
|
def __init__(self, app_config: AppConfig, parser_config_path: str):
|
|
config_mod = load_config_module(parser_config_path)
|
|
self._parsers = config_mod.PARSERS
|
|
self._config = app_config
|
|
|
|
async def submit_transaction(self, t: Transaction):
|
|
cmd = (
|
|
ACTUAL_PATH
|
|
+ f' -a "{t.account}"'
|
|
+ (f' -p "{t.payee}"' if t.payee else "")
|
|
+ f' -m "{t.amount}"'
|
|
+ f' -d "{t.date}"'
|
|
+ (f' -n "{t.notes}"' if t.notes else "")
|
|
)
|
|
debug("Actual command: %s", cmd)
|
|
proc = await asyncio.create_subprocess_shell(
|
|
cmd=cmd,
|
|
env=os.environ,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, _stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
error("Submitting to actual failed: %s", stdout)
|
|
|
|
async def process_message(self, msg_b: bytes):
|
|
debug("parsing message")
|
|
msg = email.message_from_bytes(msg_b, policy=email.policy.default)
|
|
info(
|
|
"Found message from %s to %s subject %s",
|
|
msg.get("From", "<unknown>"),
|
|
msg.get("To", "<unknown>"),
|
|
msg.get("Subject", "<no subject>"),
|
|
)
|
|
|
|
for parser in self._parsers:
|
|
debug("Running parser %s", type(parser).__name__)
|
|
try:
|
|
if parser.match(msg):
|
|
info("Parser %s claimed message", type(parser).__name__)
|
|
trans = parser.extract(msg)
|
|
if trans is not None:
|
|
info("Submitting transaction to Actual: %s", trans)
|
|
await self.submit_transaction(trans)
|
|
else:
|
|
warning("Parser %s returned None", type(parser).__name__)
|
|
except TransactionParsingFailed as e:
|
|
warning("Unable to parse message %s", e)
|
|
except Exception as e:
|
|
warning("Unexpected exception %s", e)
|
|
|
|
async def poll_imap(self):
|
|
info("polling mailbox")
|
|
with IMAP4(self._config.imap_server, self._config.imap_port, TIMEOUT) as M:
|
|
if self._config.imap_starttls:
|
|
context = ssl.create_default_context()
|
|
M.starttls(context)
|
|
M.login(self._config.imap_user, self._config.imap_pass)
|
|
M.select(self._config.imap_mailbox)
|
|
_status, m_set = M.search(None, "UNSEEN")
|
|
|
|
for msg_id in m_set[0].split():
|
|
debug("Retrieving msg id %s", msg_id)
|
|
_status, msg = M.fetch(msg_id, "(RFC822)")
|
|
if _status != "OK" or msg[0] is None:
|
|
error("Unable to fetch message %s", msg_id)
|
|
continue
|
|
msg_body = msg[0][1]
|
|
if isinstance(msg_body, int):
|
|
error("Unable to fetch message %s", msg_id)
|
|
continue
|
|
debug("Processing message %s", msg_id)
|
|
await self.process_message(msg_body)
|
|
|
|
async def run(self):
|
|
async for _ in ticker(self._config.imap_interval):
|
|
await self.poll_imap()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=getenv("LOG_LEVEL", "INFO"))
|
|
app_config = AppConfig.from_environ()
|
|
app = App(app_config, CONFIG_PATH)
|
|
asyncio.run(app.run())
|