actual-imap-poll/watcher.py

144 lines
4.8 KiB
Python

#!/usr/bin/env python
import asyncio
import email.policy
import importlib.util
import logging
import os
import ssl
from dataclasses import dataclass
from imaplib import IMAP4
from logging import debug, error, info, warning
from os import getenv
from model import Transaction
from parsers import TransactionParsingFailed
CONFIG_PATH = getenv("CONFIG_PATH", "/data/config.py")
ACTUAL_PATH = "./cli.js"
TIMEOUT = 30
async def ticker(interval: float):
while True:
yield
await asyncio.sleep(interval)
def load_config_module(path: str):
spec = importlib.util.spec_from_file_location("config", path)
if spec is None or spec.loader is None:
raise ImportError(f"Could not find config module at {path}")
config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config)
return config
@dataclass
class AppConfig:
@staticmethod
def from_environ() -> "AppConfig":
return AppConfig(
imap_server=os.environ["IMAP_SERVER"],
imap_port=int(getenv("IMAP_PORT", 143)),
imap_user=os.environ["IMAP_USER"],
imap_pass=os.environ["IMAP_PASS"],
imap_mailbox=getenv("IMAP_MAILBOX", "INBOX"),
imap_interval=int(getenv("IMAP_INTERVAL", 300)),
imap_starttls=bool(getenv("IMAP_STARTTLS", True)),
)
imap_server: str
imap_user: str
imap_pass: str
imap_port: int = 143
imap_mailbox: str = "INBOX"
imap_interval: int = 300
imap_starttls: bool = True
class App:
def __init__(self, app_config: AppConfig, parser_config_path: str):
config_mod = load_config_module(parser_config_path)
self._parsers = config_mod.PARSERS
self._config = app_config
async def submit_transaction(self, t: Transaction):
cmd = (
ACTUAL_PATH
+ f' -a "{t.account}"'
+ (f' -p "{t.payee}"' if t.payee else "")
+ f' -m "{t.amount}"'
+ f' -d "{t.date}"'
+ (f' -n "{t.notes}"' if t.notes else "")
)
debug("Actual command: %s", cmd)
proc = await asyncio.create_subprocess_shell(
cmd=cmd,
env=os.environ,
stderr=asyncio.subprocess.STDOUT,
stdout=asyncio.subprocess.PIPE,
)
stdout, _stderr = await proc.communicate()
if proc.returncode != 0:
error("Submitting to actual failed: %s", stdout)
async def process_message(self, msg_b: bytes):
debug("parsing message")
msg = email.message_from_bytes(msg_b, policy=email.policy.default)
info(
"Found message from %s to %s subject %s",
msg.get("From", "<unknown>"),
msg.get("To", "<unknown>"),
msg.get("Subject", "<no subject>"),
)
for parser in self._parsers:
debug("Running parser %s", type(parser).__name__)
try:
if parser.match(msg):
info("Parser %s claimed message", type(parser).__name__)
trans = parser.extract(msg)
if trans is not None:
info("Submitting transaction to Actual: %s", trans)
await self.submit_transaction(trans)
else:
warning("Parser %s returned None", type(parser).__name__)
except TransactionParsingFailed as e:
warning("Unable to parse message %s", e)
except Exception as e:
warning("Unexpected exception %s", e)
async def poll_imap(self):
info("polling mailbox")
with IMAP4(self._config.imap_server, self._config.imap_port, TIMEOUT) as M:
if self._config.imap_starttls:
context = ssl.create_default_context()
M.starttls(context)
M.login(self._config.imap_user, self._config.imap_pass)
M.select(self._config.imap_mailbox)
_status, m_set = M.search(None, "UNSEEN")
for msg_id in m_set[0].split():
debug("Retrieving msg id %s", msg_id)
_status, msg = M.fetch(msg_id, "(RFC822)")
if _status != "OK" or msg[0] is None:
error("Unable to fetch message %s", msg_id)
continue
msg_body = msg[0][1]
if isinstance(msg_body, int):
error("Unable to fetch message %s", msg_id)
continue
debug("Processing message %s", msg_id)
await self.process_message(msg_body)
async def run(self):
async for _ in ticker(self._config.imap_interval):
await self.poll_imap()
if __name__ == "__main__":
logging.basicConfig(level=getenv("LOG_LEVEL", "INFO"))
app_config = AppConfig.from_environ()
app = App(app_config, CONFIG_PATH)
asyncio.run(app.run())