repackage properly
This commit is contained in:
6
actual_imap_poll/__init__.py
Normal file
6
actual_imap_poll/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from .watcher import main
|
||||
|
||||
__all__ = ["main"]
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
19
actual_imap_poll/model.py
Normal file
19
actual_imap_poll/model.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
from decimal import Decimal
|
||||
from uuid import UUID
|
||||
|
||||
|
||||
@dataclass
|
||||
class Transaction:
|
||||
"""Represents an Actual Budget transaction to be submitted.
|
||||
|
||||
See: https://actualbudget.org/docs/api/reference#transaction for field descriptions
|
||||
"""
|
||||
|
||||
account: UUID
|
||||
date: date
|
||||
amount: Decimal # Note: decimal dollars, JS shim will convert to cents as described in the API
|
||||
payee: str # imported_payee in API
|
||||
notes: str
|
||||
imported_id: str
|
||||
268
actual_imap_poll/parsers.py
Normal file
268
actual_imap_poll/parsers.py
Normal file
@@ -0,0 +1,268 @@
|
||||
import re
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from email.message import EmailMessage, Message
|
||||
from logging import info, warning
|
||||
from typing import Any, Optional, Sequence
|
||||
from uuid import UUID
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .model import Transaction
|
||||
|
||||
|
||||
def parse_email_time(s: str) -> datetime:
|
||||
return datetime.strptime(s, "%a, %d %b %Y %H:%M:%S %z")
|
||||
|
||||
|
||||
class TransactionParser(ABC):
|
||||
@abstractmethod
|
||||
def match(self, msg: Message) -> bool:
|
||||
"""
|
||||
Determines if the given email message matches the criteria for this parser.
|
||||
|
||||
Args:
|
||||
msg (Message): The email message to evaluate.
|
||||
|
||||
Returns:
|
||||
bool: True if the message matches the parser's criteria, False otherwise.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def extract(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
"""
|
||||
Extracts transaction details from the given email message.
|
||||
|
||||
Args:
|
||||
msg (EmailMessage): The email message to parse.
|
||||
|
||||
Returns:
|
||||
Transaction: A Transaction object containing the extracted details.
|
||||
|
||||
Raises:
|
||||
TransactionParsingFailed: If the message cannot be parsed successfully.
|
||||
"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_content(
|
||||
msg: EmailMessage, preferencelist: Sequence[str] = ("html", "plain")
|
||||
) -> Any:
|
||||
body = msg.get_body(preferencelist=preferencelist)
|
||||
if body is None:
|
||||
raise TransactionParsingFailed("No body of message found")
|
||||
content = body.get_content()
|
||||
if content is None:
|
||||
raise TransactionParsingFailed("No content of message found")
|
||||
return content
|
||||
|
||||
|
||||
class TransactionParsingFailed(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RogersBankParser(TransactionParser):
|
||||
EXTRACT_RE = re.compile(
|
||||
r"Attempt of \$([0-9,]+\.\d{2}) was made on ([A-z]{3} \d{1,2}, \d{4})[^<]*at ([^<]+) in ([^<]+)." # noqa: E501
|
||||
)
|
||||
|
||||
def __init__(self, account_id: UUID):
|
||||
self.account_id = account_id
|
||||
|
||||
def match(self, msg: Message) -> bool:
|
||||
return (
|
||||
msg["From"] == "Rogers Bank <onlineservices@RogersBank.com>"
|
||||
and msg["Subject"] == "Purchase amount alert"
|
||||
)
|
||||
|
||||
def extract(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
content = self.get_content(msg)
|
||||
matches = self.EXTRACT_RE.search(content)
|
||||
if matches is None:
|
||||
raise TransactionParsingFailed("No matches for extraction RE")
|
||||
amount = Decimal(matches[1].replace(",", "")) * -1
|
||||
|
||||
date_raw = matches[2]
|
||||
payee = matches[3]
|
||||
location = matches[4]
|
||||
|
||||
if "Rebate" == location and "CashBack" in payee:
|
||||
amount = amount * -1
|
||||
|
||||
date = datetime.strptime(date_raw, "%b %d, %Y").date()
|
||||
return Transaction(
|
||||
account=self.account_id,
|
||||
date=date,
|
||||
amount=amount,
|
||||
payee=payee,
|
||||
notes=f"in {location} (via email)",
|
||||
imported_id=msg["Message-ID"],
|
||||
)
|
||||
|
||||
|
||||
class MBNAParser(TransactionParser):
|
||||
EXTRACT_RE = re.compile(
|
||||
r"A purchase of \$([0-9,]+\.\d{2}) from ([^<]+) was made at (\d{1,2}:\d{2} (AM|PM)) UTC on (\d{4}-\d{2}-\d{2})" # noqa: E501
|
||||
)
|
||||
|
||||
def __init__(self, account_id: UUID):
|
||||
self.account_id = account_id
|
||||
|
||||
def match(self, msg: Message) -> bool:
|
||||
return (
|
||||
msg["From"] == "MBNA Notifications <noreply@mbna.ca>"
|
||||
and msg["Subject"] == "MBNA - Transaction Alert"
|
||||
)
|
||||
|
||||
def extract(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
content = self.get_content(msg)
|
||||
matches = self.EXTRACT_RE.search(content)
|
||||
if matches is None:
|
||||
raise TransactionParsingFailed("No matches for extraction RE")
|
||||
amount = Decimal(matches[1].replace(",", "")) * -1
|
||||
payee = matches[2]
|
||||
date_raw = matches[5]
|
||||
return Transaction(
|
||||
account=self.account_id,
|
||||
date=date.fromisoformat(date_raw),
|
||||
amount=amount,
|
||||
payee=payee,
|
||||
notes="via email",
|
||||
imported_id=msg["Message-ID"],
|
||||
)
|
||||
|
||||
|
||||
class BMOParser(TransactionParser):
|
||||
EXTRACT_RE = re.compile(
|
||||
r"We want to let you know that a (withdrawal|deposit) of\s+\$([0-9,]+\.\d{2})\s+has been made (?:to|from) your account ending\s+in\s+(\d{3})", # noqa: E501
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
def __init__(self, account_map: dict[int, UUID]):
|
||||
self._account_map = account_map
|
||||
|
||||
def match(self, msg: Message) -> bool:
|
||||
return msg["From"] == "bmoalerts@bmo.com"
|
||||
|
||||
def extract(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
content = self.get_content(msg)
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
matches = self.EXTRACT_RE.search(soup.get_text())
|
||||
if matches is None:
|
||||
raise TransactionParsingFailed("No matches for extraction RE")
|
||||
|
||||
amount = Decimal(matches[2].replace(",", ""))
|
||||
if matches[1] == "withdrawal":
|
||||
amount = amount * -1
|
||||
date = parse_email_time(msg["Date"]).date()
|
||||
account_ref = int(matches[3])
|
||||
if account_ref not in self._account_map:
|
||||
warning("Account %s not in account map, skipping transaction", account_ref)
|
||||
return None
|
||||
account_id = self._account_map[account_ref]
|
||||
return Transaction(
|
||||
account=account_id,
|
||||
date=date,
|
||||
amount=amount,
|
||||
payee="",
|
||||
notes="via email",
|
||||
imported_id=msg["Message-ID"],
|
||||
)
|
||||
|
||||
|
||||
class CIBCParser(TransactionParser):
|
||||
PAYMENT_EXTRACT_RE = re.compile(
|
||||
r"recently received a \$([0-9,]+\.\d{2}) payment to your [^<]+ ending in (\d{4})",
|
||||
flags=re.MULTILINE,
|
||||
)
|
||||
|
||||
def __init__(self, account_map: dict[int, UUID]):
|
||||
self._account_map = account_map
|
||||
|
||||
def match(self, msg: Message) -> bool:
|
||||
return msg["From"] == "CIBC Banking <mailbox.noreply@cibc.com>"
|
||||
|
||||
def extract_payment(self, msg: EmailMessage):
|
||||
content = self.get_content(msg)
|
||||
|
||||
matches = self.PAYMENT_EXTRACT_RE.search(content)
|
||||
if matches is None:
|
||||
raise TransactionParsingFailed("no matches for extraction RE")
|
||||
|
||||
amount = Decimal(matches[1].replace(",", ""))
|
||||
account_ref = int(matches[2])
|
||||
date = parse_email_time(msg["Date"]).date()
|
||||
|
||||
if account_ref not in self._account_map:
|
||||
warning("Account %s not in account map, skipping transaction", account_ref)
|
||||
return None
|
||||
account_id = self._account_map[account_ref]
|
||||
|
||||
return Transaction(
|
||||
account=account_id,
|
||||
date=date,
|
||||
amount=amount,
|
||||
payee="",
|
||||
notes="via email",
|
||||
imported_id=msg["Message-ID"],
|
||||
)
|
||||
|
||||
def extract(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
match msg["Subject"]:
|
||||
case "New payment to your credit card":
|
||||
return self.extract_payment(msg)
|
||||
return None
|
||||
|
||||
|
||||
class ScotiaBankParser(TransactionParser):
|
||||
PAYMENT_EXTRACT_RE = re.compile(
|
||||
r"There was an authorization (?P<card>without the credit card present )?for \$(?P<amount>[0-9]+\.[0-9]{2}) at (?P<payee>.+) on account (?P<account>[0-9*]+) at\s+(?P<time>[0-9]{1,2}:[0-9]{2} (am|pm))", # noqa: E501
|
||||
re.MULTILINE,
|
||||
)
|
||||
|
||||
def __init__(self, account_map: dict[str, UUID]):
|
||||
self._account_map = account_map
|
||||
|
||||
def match(self, msg: Message) -> bool:
|
||||
return msg["From"] == "Scotia InfoAlerts <infoalerts@scotiabank.com>"
|
||||
|
||||
def extract_transaction(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
content = self.get_content(msg)
|
||||
|
||||
matches = self.PAYMENT_EXTRACT_RE.search(content)
|
||||
if matches is None:
|
||||
raise TransactionParsingFailed("no matches for extraction RE")
|
||||
|
||||
amount = Decimal(matches["amount"].replace(",", ""))
|
||||
date = parse_email_time(msg["Date"]).date()
|
||||
|
||||
if matches["account"] not in self._account_map:
|
||||
warning(
|
||||
"Account %s not in account map, skipping transaction",
|
||||
matches["account"],
|
||||
)
|
||||
return None
|
||||
account_id = self._account_map[matches["account"]]
|
||||
|
||||
return Transaction(
|
||||
account=account_id,
|
||||
date=date,
|
||||
amount=amount,
|
||||
payee=matches["payee"],
|
||||
notes="without card (via email)"
|
||||
if matches.group("card") is not None
|
||||
else "with card (via email)",
|
||||
imported_id=msg["Message-ID"],
|
||||
)
|
||||
|
||||
def extract(self, msg: EmailMessage) -> Optional[Transaction]:
|
||||
match msg["Subject"]:
|
||||
case (
|
||||
"Authorization on your credit account"
|
||||
| "Authorization without credit card present"
|
||||
):
|
||||
return self.extract_transaction(msg)
|
||||
info("Subject `%s` didn't match any extractors", msg["Subject"])
|
||||
return None
|
||||
134
actual_imap_poll/watcher.py
Normal file
134
actual_imap_poll/watcher.py
Normal file
@@ -0,0 +1,134 @@
|
||||
#!/usr/bin/env python
|
||||
import asyncio
|
||||
import email.policy
|
||||
import importlib.util
|
||||
import logging
|
||||
import os
|
||||
import ssl
|
||||
from imaplib import IMAP4
|
||||
from logging import debug, error, info, warning
|
||||
from os import getenv
|
||||
|
||||
from pydantic import Field, SecretStr
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
from .model import Transaction
|
||||
from .parsers import TransactionParsingFailed
|
||||
|
||||
CONFIG_PATH = getenv("CONFIG_PATH", "/data/config.py")
|
||||
ACTUAL_PATH = "./cli.js"
|
||||
TIMEOUT = 30
|
||||
|
||||
|
||||
async def ticker(interval: float):
|
||||
while True:
|
||||
yield
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
|
||||
def load_config_module(path: str):
|
||||
spec = importlib.util.spec_from_file_location("config", path)
|
||||
if spec is None or spec.loader is None:
|
||||
raise ImportError(f"Could not find config module at {path}")
|
||||
config = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(config)
|
||||
return config
|
||||
|
||||
|
||||
class AppConfig(BaseSettings):
|
||||
imap_server: str
|
||||
imap_user: str
|
||||
imap_pass: SecretStr
|
||||
imap_port: int = Field(default=143)
|
||||
imap_mailbox: str = Field(default="INBOX")
|
||||
imap_interval: int = Field(default=300)
|
||||
imap_starttls: bool = Field(default=True)
|
||||
|
||||
model_config = {"env_prefix": ""}
|
||||
|
||||
|
||||
class App:
|
||||
def __init__(self, app_config: AppConfig, parser_config_path: str):
|
||||
config_mod = load_config_module(parser_config_path)
|
||||
self._parsers = config_mod.PARSERS
|
||||
self._config = app_config
|
||||
|
||||
async def submit_transaction(self, t: Transaction):
|
||||
cmd = (
|
||||
ACTUAL_PATH
|
||||
+ f' -a "{t.account}"'
|
||||
+ (f' -p "{t.payee}"' if t.payee else "")
|
||||
+ f' -m "{t.amount}"'
|
||||
+ f' -d "{t.date}"'
|
||||
+ (f' -n "{t.notes}"' if t.notes else "")
|
||||
)
|
||||
debug("Actual command: %s", cmd)
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd=cmd,
|
||||
env=os.environ,
|
||||
stderr=asyncio.subprocess.STDOUT,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, _stderr = await proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
error("Submitting to actual failed: %s", stdout)
|
||||
|
||||
async def process_message(self, msg_b: bytes):
|
||||
debug("parsing message")
|
||||
msg = email.message_from_bytes(msg_b, policy=email.policy.default)
|
||||
info(
|
||||
"Found message from %s to %s subject %s",
|
||||
msg.get("From", "<unknown>"),
|
||||
msg.get("To", "<unknown>"),
|
||||
msg.get("Subject", "<no subject>"),
|
||||
)
|
||||
|
||||
for parser in self._parsers:
|
||||
debug("Running parser %s", type(parser).__name__)
|
||||
try:
|
||||
if parser.match(msg):
|
||||
info("Parser %s claimed message", type(parser).__name__)
|
||||
trans = parser.extract(msg)
|
||||
if trans is not None:
|
||||
info("Submitting transaction to Actual: %s", trans)
|
||||
await self.submit_transaction(trans)
|
||||
else:
|
||||
warning("Parser %s returned None", type(parser).__name__)
|
||||
except TransactionParsingFailed as e:
|
||||
warning("Unable to parse message %s", e)
|
||||
except Exception as e:
|
||||
warning("Unexpected exception %s", e)
|
||||
|
||||
async def poll_imap(self):
|
||||
info("polling mailbox")
|
||||
with IMAP4(self._config.imap_server, self._config.imap_port, TIMEOUT) as M:
|
||||
if self._config.imap_starttls:
|
||||
context = ssl.create_default_context()
|
||||
M.starttls(context)
|
||||
M.login(self._config.imap_user, self._config.imap_pass.get_secret_value())
|
||||
M.select(self._config.imap_mailbox)
|
||||
_status, m_set = M.search(None, "UNSEEN")
|
||||
|
||||
for msg_id in m_set[0].split():
|
||||
debug("Retrieving msg id %s", msg_id)
|
||||
_status, msg = M.fetch(msg_id, "(RFC822)")
|
||||
if _status != "OK" or msg[0] is None:
|
||||
error("Unable to fetch message %s", msg_id)
|
||||
continue
|
||||
msg_body = msg[0][1]
|
||||
if isinstance(msg_body, int):
|
||||
error("Unable to fetch message %s", msg_id)
|
||||
continue
|
||||
debug("Processing message %s", msg_id)
|
||||
await self.process_message(msg_body)
|
||||
|
||||
async def run(self):
|
||||
async for _ in ticker(self._config.imap_interval):
|
||||
await self.poll_imap()
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=getenv("LOG_LEVEL", "INFO"))
|
||||
app_config = AppConfig()
|
||||
app = App(app_config, CONFIG_PATH)
|
||||
asyncio.run(app.run())
|
||||
Reference in New Issue
Block a user