summary history files

web/penny/resources/reports/monthly_breakdown.py
from sqlalchemy.sql import func
from penny import models
import calendar
import datetime
from dateutil.rrule import rrule, MONTHLY
from typing import List, TypeVar


class Month:
    def __init__(self, year, month, amount, **kwargs):
        self.year = year
        self.month = month
        self.amount = amount
        self.previousMonth = kwargs.get("previousMonth", 0)
        self.avg = kwargs.get("avg", 0)
        self.previousAvg = kwargs.get("previousAvg", 0)

    @property
    def daycount(self):
        return calendar.monthrange(int(self.year), int(self.month))[1]

    @property
    def date(self):
        return "{0}-{1}".format(self.year, self.month)

    @property
    def change(self):
        return self.amount - self.previousMonth

    def changeAvg(self):
        return self.avg - self.previousAvg


AccountOrTag = TypeVar("AccountOrTag", models.Account, models.Tag)


class ReportsMonthlyBreakdown:
    def __init__(self, account_or_tag: AccountOrTag):
        self.account_or_tag: AccountOrTag = account_or_tag
        self._allAmounts: List[int] = []

    @property
    def _firstTransaction(self):
        q = (
            models.db.session.query(models.Transaction)
            .filter(
                models.Transaction.is_deleted == False,
                models.Transaction.is_archived == False,
                models.Transaction.user_id == self.account_or_tag.user.id,
            )
            .order_by(models.Transaction.date)
        )

        if isinstance(self.account_or_tag, models.Account):
            q = q.filter(models.Transaction.account_id == self.account_or_tag.id)
        else:
            q = q.filter(
                models.Transaction.tags.any(models.Tag.id == self.account_or_tag.id)
            )

        return q.first()

    @property
    def _totalDays(self):
        # Only go back 5 years for now on this report. Maybe this report could accept start and end date parameters.
        delta = datetime.datetime.now() - self._firstTransaction.date
        days = delta.days
        if days > 1825:
            days = 1825
        return days

    def generate(self):
        report = {"transactions": {}}
        data = {}

        if not self._firstTransaction:
            return report

        end_date = datetime.datetime.now()
        start_date = end_date - datetime.timedelta(days=self._totalDays)
        start_date = start_date.replace(day=1)

        if models.db.engine.name == "sqlite":
            q = (
                models.db.session.query(
                    func.strftime("%Y", models.Transaction.date).label("year"),
                    func.strftime("%m", models.Transaction.date).label("month"),
                    func.sum(models.Transaction.credit).label("credit"),
                    func.sum(models.Transaction.debit).label("debit"),
                )
                .filter(
                    models.Transaction.is_deleted == False,  # noqa[W0612]
                    models.Transaction.is_archived == False,
                    models.Transaction.user_id == self.account_or_tag.user.id,
                    models.Transaction.date >= start_date,
                    models.Transaction.date <= end_date,
                )
                .group_by(func.strftime("%Y-%m-01", models.Transaction.date))
                .order_by(models.Transaction.date)
            )
        else:
            q = (
                models.db.session.query(
                    func.date_format(models.Transaction.date, "%Y").label("year"),
                    func.date_format(models.Transaction.date, "%m").label("month"),
                    func.sum(models.Transaction.credit).label("credit"),
                    func.sum(models.Transaction.debit).label("debit"),
                )
                .filter(
                    models.Transaction.is_deleted == False,  # noqa[W0612]
                    models.Transaction.is_archived == False,
                    models.Transaction.user_id == self.account_or_tag.user.id,
                    models.Transaction.date >= start_date,
                    models.Transaction.date <= end_date,
                )
                .group_by(func.date_format(models.Transaction.date, "%Y-%m-01"))
                .order_by(models.Transaction.date)
            )

        if isinstance(self.account_or_tag, models.Account):
            q = q.filter(models.Transaction.account_id == self.account_or_tag.id)
        else:
            q = q.filter(
                models.Transaction.tags.any(models.Tag.id == self.account_or_tag.id)
            )

        for transaction in q.all():
            month = Month(
                year=transaction.year,
                month=transaction.month,
                amount=int(transaction.credit + transaction.debit),
            )
            data[month.date] = month

        count = 0
        for d in rrule(freq=MONTHLY, dtstart=start_date, until=end_date):
            month = Month(
                year=d.strftime("%Y"),
                month=d.strftime("%m"),
                amount=0,
                previousMonth=0,
                avg=0,
            )

            exists = data.get(month.date)
            if exists:
                month.amount = exists.amount

            self._allAmounts.append(month.amount)
            if count >= 1:
                month.previousMonth = self._allAmounts[count - 1]

            lastAmounts = self._allAmounts[-12:]
            try:
                month.avg = sum(lastAmounts) / len(lastAmounts)
            except ZeroDivisionError:
                pass

            report["transactions"][month.date] = month
            count += 1

        return report