aboutsummaryrefslogtreecommitdiff
path: root/tests/incite/mergers/test_pop_ledger.py
diff options
context:
space:
mode:
authorMax Nanis2026-03-06 16:49:46 -0500
committerMax Nanis2026-03-06 16:49:46 -0500
commit91d040211a4ed6e4157896256a762d3854777b5e (patch)
treecd95922ea4257dc8d3f4e4cbe8534474709a20dc /tests/incite/mergers/test_pop_ledger.py
downloadgeneralresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz
generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip
Initial commitv3.3.4
Diffstat (limited to 'tests/incite/mergers/test_pop_ledger.py')
-rw-r--r--tests/incite/mergers/test_pop_ledger.py307
1 files changed, 307 insertions, 0 deletions
diff --git a/tests/incite/mergers/test_pop_ledger.py b/tests/incite/mergers/test_pop_ledger.py
new file mode 100644
index 0000000..6f96108
--- /dev/null
+++ b/tests/incite/mergers/test_pop_ledger.py
@@ -0,0 +1,307 @@
+from datetime import timedelta, datetime, timezone
+from itertools import product as iter_product
+from typing import Optional
+
+import pandas as pd
+import pytest
+from distributed.utils_test import client_no_amm
+
+from generalresearch.incite.schemas.mergers.pop_ledger import (
+ numerical_col_names,
+)
+from test_utils.incite.collections.conftest import ledger_collection
+from test_utils.incite.conftest import mnt_filepath, incite_item_factory
+from test_utils.incite.mergers.conftest import pop_ledger_merge
+from test_utils.managers.ledger.conftest import create_main_accounts
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(
+ iter_product(
+ ["12h", "3D"],
+ [timedelta(days=4)],
+ )
+ ),
+)
+class TestMergePOPLedger:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_base(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ product,
+ user_factory,
+ create_main_accounts,
+ thl_lm,
+ delete_df_collection,
+ incite_item_factory,
+ delete_ledger_db,
+ ):
+ from generalresearch.models.thl.ledger import LedgerAccount
+
+ u = user_factory(product=product, created=ledger_collection.start)
+
+ # -- Build & Setup
+ delete_ledger_db()
+ create_main_accounts()
+ delete_df_collection(coll=ledger_collection)
+ # assert ledger_collection.start is None
+ # assert ledger_collection.offset is None
+
+ for item in ledger_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load()
+
+ # Confirm any of the items are archived
+ assert ledger_collection.progress.has_archive.eq(True).all()
+
+ pop_ledger_merge.build(
+ client=client_no_amm,
+ ledger_coll=ledger_collection,
+ )
+ # assert pop_ledger_merge.progress.has_archive.eq(True).all()
+
+ ddf = pop_ledger_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+
+ user_wallet_account: LedgerAccount = thl_lm.get_account_or_create_user_wallet(
+ user=u
+ )
+ cash_account: LedgerAccount = thl_lm.get_account_cash()
+ rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue()
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ # Pure SQL based lookups
+ cash_balance: int = thl_lm.get_account_balance(account=cash_account)
+ rev_balance: int = thl_lm.get_account_balance(account=rev_account)
+ assert cash_balance > rev_balance
+
+ # (1) Test Cash Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names,
+ filters=[
+ ("account_id", "==", cash_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+ assert df["mp_payment.CREDIT"].sum() == 0
+ assert cash_balance > 0
+ assert df["mp_payment.DEBIT"].sum() == cash_balance
+
+ # (2) Test Revenue Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names,
+ filters=[
+ ("account_id", "==", rev_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert rev_balance == 0
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == 0
+ assert df["mp_payment.CREDIT"].sum() > 0
+
+ # -- Cleanup
+ delete_ledger_db()
+
+ def test_pydantic_init(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ mnt_filepath,
+ product,
+ user_factory,
+ create_main_accounts,
+ offset,
+ duration,
+ start,
+ thl_lm,
+ incite_item_factory,
+ delete_df_collection,
+ delete_ledger_db,
+ session_collection,
+ ):
+ from generalresearch.models.thl.ledger import LedgerAccount
+ from generalresearch.models.thl.product import Product
+ from generalresearch.models.thl.finance import ProductBalances
+
+ u = user_factory(product=product, created=session_collection.start)
+
+ assert ledger_collection.finished is not None
+ assert isinstance(u.product, Product)
+ delete_ledger_db()
+ create_main_accounts(),
+ delete_df_collection(coll=ledger_collection)
+
+ bp_account: LedgerAccount = thl_lm.get_account_or_create_bp_wallet(
+ product=u.product
+ )
+ cash_account: LedgerAccount = thl_lm.get_account_cash()
+ rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue()
+
+ for item in ledger_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load(overwrite=True)
+
+ pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection)
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ # (1) Filter by the Product Account, this means no cash_account, or
+ # rev_account transactions will be present in here...
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", bp_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+ df = df.set_index("time_idx")
+ assert not df.empty
+
+ instance = ProductBalances.from_pandas(input_data=df.sum())
+ assert instance.payout == instance.net == instance.bp_payment_credit
+ assert instance.available_balance < instance.net
+ assert instance.available_balance + instance.retainer == instance.net
+ assert instance.balance == thl_lm.get_account_balance(bp_account)
+ assert df["bp_payment.CREDIT"].sum() == thl_lm.get_account_balance(bp_account)
+
+ # (2) Filter by the Cash Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", cash_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ cash_balance: int = thl_lm.get_account_balance(account=cash_account)
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert cash_balance > 0
+ assert df["mp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == cash_balance
+
+ # (2) Filter by the Revenue Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", rev_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ rev_balance: int = thl_lm.get_account_balance(account=rev_account)
+ assert rev_balance == 0
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == 0
+ assert df["mp_payment.CREDIT"].sum() > 0
+
+ def test_resample(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ mnt_filepath,
+ user_factory,
+ product,
+ create_main_accounts,
+ offset,
+ duration,
+ start,
+ thl_lm,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ assert ledger_collection.finished is not None
+ delete_df_collection(coll=ledger_collection)
+ u1: User = user_factory(product=product)
+
+ bp_account = thl_lm.get_account_or_create_bp_wallet(product=u1.product)
+
+ for item in ledger_collection.items:
+ incite_item_factory(user=u1, item=item)
+ item.initial_load(overwrite=True)
+
+ pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection)
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", bp_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+ assert isinstance(df.index, pd.Index)
+ assert not isinstance(df.index, pd.RangeIndex)
+
+ # Now change the index so we can easily resample it
+ df = df.set_index("time_idx")
+ assert isinstance(df.index, pd.Index)
+ assert isinstance(df.index, pd.DatetimeIndex)
+
+ bp_account_balance = thl_lm.get_account_balance(account=bp_account)
+
+ # Initial sum
+ initial_sum = df.sum().sum()
+ # assert len(df) == 48 # msg="Original df should be 48 rows"
+
+ # Original (1min) to 5min
+ df_5min = df.resample("5min").sum()
+ # assert len(df_5min) == 12
+ assert initial_sum == df_5min.sum().sum()
+
+ # 30min
+ df_30min = df.resample("30min").sum()
+ # assert len(df_30min) == 2
+ assert initial_sum == df_30min.sum().sum()
+
+ # 1hr
+ df_1hr = df.resample("1h").sum()
+ # assert len(df_1hr) == 1
+ assert initial_sum == df_1hr.sum().sum()
+
+ # 1 day
+ df_1day = df.resample("1d").sum()
+ # assert len(df_1day) == 1
+ assert initial_sum == df_1day.sum().sum()