actual-imap-poll/watcher.py
Keenan Tims 0b13bb3692
initial commit
Signed-off-by: Keenan Tims <ktims@gotroot.ca>
2025-04-24 00:33:20 -07:00

97 lines
2.8 KiB
Python

#!/usr/bin/env python
from email.message import EmailMessage
import logging
import asyncio, ssl, email
from os import getenv
from imaplib import IMAP4
from logging import info, debug, error, warning
from pprint import pprint
from model import Transaction
from config import PARSERS
from parsers import TransactionParsingFailed
import email.policy
import os
IMAP_SERVER = getenv("IMAP_SERVER")
IMAP_PORT = int(getenv("IMAP_PORT", 143))
IMAP_USER = getenv("IMAP_USER")
IMAP_PASS = getenv("IMAP_PASS")
IMAP_MAILBOX = getenv("IMAP_MAILBOX", "INBOX")
IMAP_INTERVAL = float(getenv("IMAP_INTERVAL", 300))
ACTUAL_PATH = "./cli.js"
TIMEOUT = 30
async def ticker(interval: float):
while True:
yield
await asyncio.sleep(interval)
async def submit_transaction(t: Transaction):
cmd=ACTUAL_PATH + f' -a "{t.account}"' + f' -p "{t.payee}"' + f' -m "{t.amount}"' + f' -d "{t.date}"' f' -n "{t.notes}"'
debug("Actual command: %s", cmd)
proc = await asyncio.create_subprocess_shell(
cmd=cmd,
env=os.environ,
stderr=asyncio.subprocess.STDOUT,
stdout=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error("Submitting to actual failed: %s", stdout)
async def process_message(msg_b: bytes):
debug("parsing message")
msg = email.message_from_bytes(msg_b, policy=email.policy.default)
pprint(msg)
info(
"Found message from %s to %s subject %s",
msg.get("From", "<unknown>"),
msg.get("To", "<unknown>"),
msg.get("Subject", "<no subject>"),
)
for parser in PARSERS:
debug("Running parser %s", type(parser).__name__)
try:
if parser.match(msg):
info("Parser %s claimed message", type(parser).__name__)
trans = parser.extract(msg)
info("Submitting transaction to Actual: %s", trans)
await submit_transaction(trans)
except TransactionParsingFailed:
warning("Unable to parse message")
except Exception as e:
warning("Unexpected exception %s", e)
async def poll_imap():
info("polling mailbox")
with IMAP4(IMAP_SERVER, IMAP_PORT, 30) as M:
context = ssl.create_default_context()
M.starttls(context)
M.login(IMAP_USER, IMAP_PASS)
M.select(IMAP_MAILBOX)
status, m_set = M.search(None, "UNSEEN")
for msg_id in m_set[0].split():
debug("Retrieving msg id %s", msg_id)
status, msg = M.fetch(msg_id, "(RFC822)")
await process_message(msg[0][1])
async def app():
async for tick in ticker(IMAP_INTERVAL):
await poll_imap()
if __name__ == "__main__":
logging.basicConfig(level=getenv("LOG_LEVEL", "INFO"))
debug("Parsers: %s", PARSERS)
asyncio.run(app())