#!/usr/bin/env python import asyncio import email.policy import logging import os import ssl from email.message import EmailMessage from imaplib import IMAP4 from logging import debug from logging import error from logging import info from logging import warning from os import getenv from pprint import pprint from typing import cast from config import PARSERS from model import Transaction from parsers import TransactionParsingFailed IMAP_SERVER = getenv("IMAP_SERVER") IMAP_PORT = int(getenv("IMAP_PORT", 143)) IMAP_USER = getenv("IMAP_USER") IMAP_PASS = getenv("IMAP_PASS") IMAP_MAILBOX = getenv("IMAP_MAILBOX", "INBOX") IMAP_INTERVAL = float(getenv("IMAP_INTERVAL", 300)) ACTUAL_PATH = "./cli.js" TIMEOUT = 30 async def ticker(interval: float): while True: yield await asyncio.sleep(interval) async def submit_transaction(t: Transaction): cmd = ( ACTUAL_PATH + f' -a "{t.account}"' + f' -p "{t.payee}"' + f' -m "{t.amount}"' + f' -d "{t.date}"' f' -n "{t.notes}"' ) debug("Actual command: %s", cmd) proc = await asyncio.create_subprocess_shell( cmd=cmd, env=os.environ, stderr=asyncio.subprocess.STDOUT, stdout=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error("Submitting to actual failed: %s", stdout) async def process_message(msg_b: bytes): debug("parsing message") msg = cast( EmailMessage, email.message_from_bytes(msg_b, policy=email.policy.default) ) pprint(msg) info( "Found message from %s to %s subject %s", msg.get("From", ""), msg.get("To", ""), msg.get("Subject", ""), ) for parser in PARSERS: debug("Running parser %s", type(parser).__name__) try: if parser.match(msg): info("Parser %s claimed message", type(parser).__name__) trans = parser.extract(msg) info("Submitting transaction to Actual: %s", trans) await submit_transaction(trans) except TransactionParsingFailed as e: warning("Unable to parse message %s", e) except Exception as e: warning("Unexpected exception %s", e) async def poll_imap(): info("polling mailbox") with IMAP4(IMAP_SERVER, IMAP_PORT, 30) as M: context = ssl.create_default_context() M.starttls(context) M.login(IMAP_USER, IMAP_PASS) M.select(IMAP_MAILBOX) status, m_set = M.search(None, "UNSEEN") for msg_id in m_set[0].split(): debug("Retrieving msg id %s", msg_id) status, msg = M.fetch(msg_id, "(RFC822)") await process_message(msg[0][1]) async def app(): async for tick in ticker(IMAP_INTERVAL): await poll_imap() if __name__ == "__main__": logging.basicConfig(level=getenv("LOG_LEVEL", "INFO")) debug("Parsers: %s", PARSERS) asyncio.run(app())