diff options
Diffstat (limited to 'tests/incite/mergers')
| -rw-r--r-- | tests/incite/mergers/__init__.py | 0 | ||||
| -rw-r--r-- | tests/incite/mergers/foundations/__init__.py | 0 | ||||
| -rw-r--r-- | tests/incite/mergers/foundations/test_enriched_session.py | 138 | ||||
| -rw-r--r-- | tests/incite/mergers/foundations/test_enriched_task_adjust.py | 76 | ||||
| -rw-r--r-- | tests/incite/mergers/foundations/test_enriched_wall.py | 236 | ||||
| -rw-r--r-- | tests/incite/mergers/foundations/test_user_id_product.py | 73 | ||||
| -rw-r--r-- | tests/incite/mergers/test_merge_collection.py | 102 | ||||
| -rw-r--r-- | tests/incite/mergers/test_merge_collection_item.py | 66 | ||||
| -rw-r--r-- | tests/incite/mergers/test_pop_ledger.py | 307 | ||||
| -rw-r--r-- | tests/incite/mergers/test_ym_survey_merge.py | 125 |
10 files changed, 1123 insertions, 0 deletions
diff --git a/tests/incite/mergers/__init__.py b/tests/incite/mergers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/mergers/__init__.py diff --git a/tests/incite/mergers/foundations/__init__.py b/tests/incite/mergers/foundations/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/mergers/foundations/__init__.py diff --git a/tests/incite/mergers/foundations/test_enriched_session.py b/tests/incite/mergers/foundations/test_enriched_session.py new file mode 100644 index 0000000..ec15d38 --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_session.py @@ -0,0 +1,138 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product +from typing import Optional + +from generalresearch.incite.schemas.admin_responses import ( + AdminPOPSessionSchema, +) + +import dask.dataframe as dd +import pandas as pd +import pytest + +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=5)], + ) + ), +) +class TestEnrichedSession: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_session_merge, + thl_web_rr, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=session_collection) + + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_session_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_session_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + # -- Teardown + delete_df_collection(session_collection) + + +class TestEnrichedSessionAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_to_admin_response( + self, + event_report_request, + enriched_session_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + session_report_request, + user_factory, + start, + session_factory, + product_factory, + delete_df_collection, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=1, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_session_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + + df = enriched_session_merge.to_admin_response( + rr=session_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert isinstance(AdminPOPSessionSchema.validate(df), pd.DataFrame) + assert df.index.get_level_values(1).nunique() == 2 diff --git a/tests/incite/mergers/foundations/test_enriched_task_adjust.py b/tests/incite/mergers/foundations/test_enriched_task_adjust.py new file mode 100644 index 0000000..96c214f --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_task_adjust.py @@ -0,0 +1,76 @@ +from datetime import timedelta +from itertools import product as iter_product + +import dask.dataframe as dd +import pandas as pd +import pytest + +from test_utils.incite.collections.conftest import ( + wall_collection, + task_adj_collection, + session_collection, +) +from test_utils.incite.mergers.conftest import enriched_wall_merge + + +@pytest.mark.parametrize( + argnames="offset, duration,", + argvalues=list( + iter_product( + ["12h", "3D"], + [timedelta(days=5)], + ) + ), +) +class TestEnrichedTaskAdjust: + + @pytest.mark.skip + def test_base( + self, + client_no_amm, + user_factory, + product, + task_adj_collection, + wall_collection, + session_collection, + enriched_wall_merge, + enriched_task_adjust_merge, + incite_item_factory, + delete_df_collection, + thl_web_rr, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + u1: User = user_factory(product=product) + + for item in session_collection.items: + incite_item_factory(user=u1, item=item) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + + enriched_task_adjust_merge.build( + client=client_no_amm, + task_adjust_coll=task_adj_collection, + enriched_wall=enriched_wall_merge, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_task_adjust_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty diff --git a/tests/incite/mergers/foundations/test_enriched_wall.py b/tests/incite/mergers/foundations/test_enriched_wall.py new file mode 100644 index 0000000..8f4995b --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_wall.py @@ -0,0 +1,236 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product as iter_product +from typing import Optional + +import dask.dataframe as dd +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMergeItem, +) +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) +from test_utils.incite.conftest import incite_item_factory +from test_utils.incite.mergers.conftest import ( + enriched_wall_merge, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list(iter_product(["48h", "3D"], [timedelta(days=5)])), +) +class TestEnrichedWall: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + thl_web_rr, + session_collection, + enriched_wall_merge, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + delete_df_collection(coll=wall_collection) + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_wall_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + def test_base_item( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_wall_merge, + delete_df_collection, + thl_web_rr, + incite_item_factory, + ): + # -- Build & Setup + delete_df_collection(coll=session_collection) + u = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + for item in enriched_wall_merge.items: + assert isinstance(item, EnrichedWallMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + item.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + modified_time2 = path.stat().st_mtime + + # Merger Items can't be updated unless it's a partial, confirm + # that even after attempting to rebuild, it doesn't re-touch + # the file + assert modified_time2 == modified_time1 + + # def test_admin_pop_session_device_type(ew_merge_setup): + # self.build() + # + # rr = ReportRequest( + # report_type=ReportType.POP_EVENT, + # index0="started", + # index1="device_type", + # freq="min", + # start=start, + # ) + # + # df, categories, updated = self.instance.to_admin_response( + # rr=rr, product_ids=[self.product.id], client=client + # ) + # + # assert isinstance(df, pd.DataFrame) + # device_types_str = [str(e.value) for e in DeviceType] + # device_types = df.index.get_level_values(1).values + # assert all([dt in device_types_str for dt in device_types]) + + +class TestEnrichedWallToAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_empty(self, enriched_wall_merge, client_no_amm, start): + from generalresearch.models.admin.request import ReportRequest + + rr = ReportRequest.model_validate({"interval": "5min", "start": start}) + + res = enriched_wall_merge.to_admin_response( + rr=rr, + client=client_no_amm, + ) + + assert isinstance(res, pd.DataFrame) + + assert res.empty + assert len(res.columns) > 5 + + def test_to_admin_response( + self, + event_report_request, + enriched_wall_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + user, + session_factory, + delete_df_collection, + product_factory, + user_factory, + start, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=2, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + df = enriched_wall_merge.to_admin_response( + rr=event_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + # assert len(df) == 1 + # assert user.product_id == df.reset_index().loc[0, "index1"] + assert df.index.get_level_values(1).nunique() == 2 diff --git a/tests/incite/mergers/foundations/test_user_id_product.py b/tests/incite/mergers/foundations/test_user_id_product.py new file mode 100644 index 0000000..f96bfb4 --- /dev/null +++ b/tests/incite/mergers/foundations/test_user_id_product.py @@ -0,0 +1,73 @@ +from datetime import timedelta, datetime, timezone +from itertools import product + +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.user_id_product import ( + UserIdProductMergeItem, +) +from test_utils.incite.mergers.conftest import user_id_product_merge + + +@pytest.mark.parametrize( + argnames="offset, duration, start", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=5)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestUserIDProduct: + + @pytest.mark.skip + def test_base(self, client_no_amm, user_id_product_merge): + ddf = user_id_product_merge.ddf() + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + assert not df.empty + + @pytest.mark.skip + def test_base_item(self, client_no_amm, user_id_product_merge, user_collection): + assert len(user_id_product_merge.items) == 1 + + for item in user_id_product_merge.items: + assert isinstance(item, UserIdProductMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + user_id_product_merge.build(client=client_no_amm, user_coll=user_collection) + modified_time2 = path.stat().st_mtime + + assert modified_time2 > modified_time1 + + @pytest.mark.skip + def test_read(self, client_no_amm, user_id_product_merge): + users_ddf = user_id_product_merge.ddf() + df = client_no_amm.compute(collections=users_ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert len(df.columns) == 1 + assert str(df.product_id.dtype) == "category" diff --git a/tests/incite/mergers/test_merge_collection.py b/tests/incite/mergers/test_merge_collection.py new file mode 100644 index 0000000..692cac3 --- /dev/null +++ b/tests/incite/mergers/test_merge_collection.py @@ -0,0 +1,102 @@ +from datetime import datetime, timezone, timedelta +from itertools import product + +import pandas as pd +import pytest +from pandera import DataFrameSchema + +from generalresearch.incite.mergers import ( + MergeCollection, + MergeType, +) +from test_utils.incite.conftest import mnt_filepath + +merge_types = list(e for e in MergeType if e != MergeType.TEST) + + +@pytest.mark.parametrize( + argnames="merge_type, offset, duration, start", + argvalues=list( + product( + merge_types, + ["5min", "6h", "14D"], + [timedelta(days=30)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestMergeCollection: + + def test_init(self, mnt_filepath, merge_type, offset, duration, start): + with pytest.raises(expected_exception=ValueError) as cm: + MergeCollection(archive_path=mnt_filepath.data_src) + assert "Must explicitly provide a merge_type" in str(cm.value) + + instance = MergeCollection( + merge_type=merge_type, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + assert instance.merge_type == merge_type + + def test_items(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + offset=offset, + start=start, + finished=start + duration, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + assert len(instance.interval_range) == len(instance.items) + + def test_progress(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + offset=offset, + start=start, + finished=start + duration, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + assert isinstance(instance.progress, pd.DataFrame) + assert instance.progress.shape[0] > 0 + assert instance.progress.shape[1] == 7 + assert instance.progress["group_by"].isnull().all() + + def test_schema(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + assert isinstance(instance._schema, DataFrameSchema) + + def test_load(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + start=start, + finished=start + duration, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + # Confirm that there are no archives available yet + assert instance.progress.has_archive.eq(False).all() + + def test_get_items(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + start=start, + finished=start + duration, + offset=offset, + merge_type=merge_type, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + # with pytest.raises(expected_exception=ResourceWarning) as cm: + res = instance.get_items_last365() + # assert "has missing archives", str(cm.value) + assert len(res) == len(instance.items) diff --git a/tests/incite/mergers/test_merge_collection_item.py b/tests/incite/mergers/test_merge_collection_item.py new file mode 100644 index 0000000..96f8789 --- /dev/null +++ b/tests/incite/mergers/test_merge_collection_item.py @@ -0,0 +1,66 @@ +from datetime import datetime, timezone, timedelta +from itertools import product +from pathlib import PurePath + +import pytest + +from generalresearch.incite.mergers import MergeCollectionItem, MergeType +from generalresearch.incite.mergers.foundations.enriched_session import ( + EnrichedSessionMerge, +) +from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMerge, +) +from test_utils.incite.mergers.conftest import merge_collection + + +@pytest.mark.parametrize( + argnames="merge_type, offset, duration", + argvalues=list( + product( + [MergeType.ENRICHED_SESSION, MergeType.ENRICHED_WALL], + ["1h"], + [timedelta(days=1)], + ) + ), +) +class TestMergeCollectionItem: + + def test_file_naming(self, merge_collection, offset, duration, start): + assert len(merge_collection.items) == 25 + + items: list[MergeCollectionItem] = merge_collection.items + + for i in items: + i: MergeCollectionItem + + assert isinstance(i.path, PurePath) + assert i.path.name == i.filename + + assert i._collection.merge_type.name.lower() in i.filename + assert i._collection.offset in i.filename + assert i.start.strftime("%Y-%m-%d-%H-%M-%S") in i.filename + + def test_archives(self, merge_collection, offset, duration, start): + assert len(merge_collection.items) == 25 + + for i in merge_collection.items: + assert not i.has_archive() + assert not i.has_empty() + assert not i.is_empty() + assert not i.has_partial_archive() + assert i.has_archive() == i.path_exists(generic_path=i.path) + + res = set([i.should_archive() for i in merge_collection.items]) + assert len(res) == 1 + + def test_item_to_archive(self, merge_collection, offset, duration, start): + for item in merge_collection.items: + item: MergeCollectionItem + assert not item.has_archive() + + # TODO: setup build methods + # ddf = self.build + # saved = instance.to_archive(ddf=ddf) + # self.assertTrue(saved) + # self.assertTrue(instance.has_archive()) diff --git a/tests/incite/mergers/test_pop_ledger.py b/tests/incite/mergers/test_pop_ledger.py new file mode 100644 index 0000000..6f96108 --- /dev/null +++ b/tests/incite/mergers/test_pop_ledger.py @@ -0,0 +1,307 @@ +from datetime import timedelta, datetime, timezone +from itertools import product as iter_product +from typing import Optional + +import pandas as pd +import pytest +from distributed.utils_test import client_no_amm + +from generalresearch.incite.schemas.mergers.pop_ledger import ( + numerical_col_names, +) +from test_utils.incite.collections.conftest import ledger_collection +from test_utils.incite.conftest import mnt_filepath, incite_item_factory +from test_utils.incite.mergers.conftest import pop_ledger_merge +from test_utils.managers.ledger.conftest import create_main_accounts + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list( + iter_product( + ["12h", "3D"], + [timedelta(days=4)], + ) + ), +) +class TestMergePOPLedger: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_base( + self, + client_no_amm, + ledger_collection, + pop_ledger_merge, + product, + user_factory, + create_main_accounts, + thl_lm, + delete_df_collection, + incite_item_factory, + delete_ledger_db, + ): + from generalresearch.models.thl.ledger import LedgerAccount + + u = user_factory(product=product, created=ledger_collection.start) + + # -- Build & Setup + delete_ledger_db() + create_main_accounts() + delete_df_collection(coll=ledger_collection) + # assert ledger_collection.start is None + # assert ledger_collection.offset is None + + for item in ledger_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load() + + # Confirm any of the items are archived + assert ledger_collection.progress.has_archive.eq(True).all() + + pop_ledger_merge.build( + client=client_no_amm, + ledger_coll=ledger_collection, + ) + # assert pop_ledger_merge.progress.has_archive.eq(True).all() + + ddf = pop_ledger_merge.ddf() + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + + # -- + + user_wallet_account: LedgerAccount = thl_lm.get_account_or_create_user_wallet( + user=u + ) + cash_account: LedgerAccount = thl_lm.get_account_cash() + rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue() + + item_finishes = [i.finish for i in ledger_collection.items] + item_finishes.sort(reverse=True) + last_item_finish = item_finishes[0] + + # Pure SQL based lookups + cash_balance: int = thl_lm.get_account_balance(account=cash_account) + rev_balance: int = thl_lm.get_account_balance(account=rev_account) + assert cash_balance > rev_balance + + # (1) Test Cash Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names, + filters=[ + ("account_id", "==", cash_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + assert df["mp_payment.CREDIT"].sum() == 0 + assert cash_balance > 0 + assert df["mp_payment.DEBIT"].sum() == cash_balance + + # (2) Test Revenue Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names, + filters=[ + ("account_id", "==", rev_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert rev_balance == 0 + assert df["bp_payment.CREDIT"].sum() == 0 + assert df["mp_payment.DEBIT"].sum() == 0 + assert df["mp_payment.CREDIT"].sum() > 0 + + # -- Cleanup + delete_ledger_db() + + def test_pydantic_init( + self, + client_no_amm, + ledger_collection, + pop_ledger_merge, + mnt_filepath, + product, + user_factory, + create_main_accounts, + offset, + duration, + start, + thl_lm, + incite_item_factory, + delete_df_collection, + delete_ledger_db, + session_collection, + ): + from generalresearch.models.thl.ledger import LedgerAccount + from generalresearch.models.thl.product import Product + from generalresearch.models.thl.finance import ProductBalances + + u = user_factory(product=product, created=session_collection.start) + + assert ledger_collection.finished is not None + assert isinstance(u.product, Product) + delete_ledger_db() + create_main_accounts(), + delete_df_collection(coll=ledger_collection) + + bp_account: LedgerAccount = thl_lm.get_account_or_create_bp_wallet( + product=u.product + ) + cash_account: LedgerAccount = thl_lm.get_account_cash() + rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue() + + for item in ledger_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load(overwrite=True) + + pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection) + + item_finishes = [i.finish for i in ledger_collection.items] + item_finishes.sort(reverse=True) + last_item_finish = item_finishes[0] + + # (1) Filter by the Product Account, this means no cash_account, or + # rev_account transactions will be present in here... + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", bp_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + df = df.set_index("time_idx") + assert not df.empty + + instance = ProductBalances.from_pandas(input_data=df.sum()) + assert instance.payout == instance.net == instance.bp_payment_credit + assert instance.available_balance < instance.net + assert instance.available_balance + instance.retainer == instance.net + assert instance.balance == thl_lm.get_account_balance(bp_account) + assert df["bp_payment.CREDIT"].sum() == thl_lm.get_account_balance(bp_account) + + # (2) Filter by the Cash Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", cash_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + cash_balance: int = thl_lm.get_account_balance(account=cash_account) + assert df["bp_payment.CREDIT"].sum() == 0 + assert cash_balance > 0 + assert df["mp_payment.CREDIT"].sum() == 0 + assert df["mp_payment.DEBIT"].sum() == cash_balance + + # (2) Filter by the Revenue Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", rev_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + rev_balance: int = thl_lm.get_account_balance(account=rev_account) + assert rev_balance == 0 + assert df["bp_payment.CREDIT"].sum() == 0 + assert df["mp_payment.DEBIT"].sum() == 0 + assert df["mp_payment.CREDIT"].sum() > 0 + + def test_resample( + self, + client_no_amm, + ledger_collection, + pop_ledger_merge, + mnt_filepath, + user_factory, + product, + create_main_accounts, + offset, + duration, + start, + thl_lm, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + assert ledger_collection.finished is not None + delete_df_collection(coll=ledger_collection) + u1: User = user_factory(product=product) + + bp_account = thl_lm.get_account_or_create_bp_wallet(product=u1.product) + + for item in ledger_collection.items: + incite_item_factory(user=u1, item=item) + item.initial_load(overwrite=True) + + pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection) + + item_finishes = [i.finish for i in ledger_collection.items] + item_finishes.sort(reverse=True) + last_item_finish = item_finishes[0] + + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", bp_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + assert isinstance(df.index, pd.Index) + assert not isinstance(df.index, pd.RangeIndex) + + # Now change the index so we can easily resample it + df = df.set_index("time_idx") + assert isinstance(df.index, pd.Index) + assert isinstance(df.index, pd.DatetimeIndex) + + bp_account_balance = thl_lm.get_account_balance(account=bp_account) + + # Initial sum + initial_sum = df.sum().sum() + # assert len(df) == 48 # msg="Original df should be 48 rows" + + # Original (1min) to 5min + df_5min = df.resample("5min").sum() + # assert len(df_5min) == 12 + assert initial_sum == df_5min.sum().sum() + + # 30min + df_30min = df.resample("30min").sum() + # assert len(df_30min) == 2 + assert initial_sum == df_30min.sum().sum() + + # 1hr + df_1hr = df.resample("1h").sum() + # assert len(df_1hr) == 1 + assert initial_sum == df_1hr.sum().sum() + + # 1 day + df_1day = df.resample("1d").sum() + # assert len(df_1day) == 1 + assert initial_sum == df_1day.sum().sum() diff --git a/tests/incite/mergers/test_ym_survey_merge.py b/tests/incite/mergers/test_ym_survey_merge.py new file mode 100644 index 0000000..4c2df6b --- /dev/null +++ b/tests/incite/mergers/test_ym_survey_merge.py @@ -0,0 +1,125 @@ +from datetime import timedelta, timezone, datetime +from itertools import product + +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from test_utils.incite.collections.conftest import wall_collection, session_collection +from test_utils.incite.mergers.conftest import ( + enriched_session_merge, + ym_survey_wall_merge, +) + + +@pytest.mark.parametrize( + argnames="offset, duration, start", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=30)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestYMSurveyMerge: + """We override start, not because it's needed on the YMSurveyWall merge, + which operates on a rolling 10-day window, but because we don't want + to mock data in the wall collection and enriched_session_merge from + the 1800s and then wonder why there is no data available in the past + 10 days in the database. + """ + + def test_base( + self, + client_no_amm, + user_factory, + product, + ym_survey_wall_merge, + wall_collection, + session_collection, + enriched_session_merge, + delete_df_collection, + incite_item_factory, + thl_web_rr, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=session_collection) + user: User = user_factory(product=product, created=session_collection.start) + + # -- Build & Setup + assert ym_survey_wall_merge.start is None + assert ym_survey_wall_merge.offset == "10D" + + for item in session_collection.items: + incite_item_factory(item=item, user=user) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + # Confirm any of the items are archived + assert session_collection.progress.has_archive.eq(True).all() + assert wall_collection.progress.has_archive.eq(True).all() + + enriched_session_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + assert enriched_session_merge.progress.has_archive.eq(True).all() + + ddf = enriched_session_merge.ddf() + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + + # -- + + ym_survey_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + enriched_session=enriched_session_merge, + ) + assert ym_survey_wall_merge.progress.has_archive.eq(True).all() + + # -- + + ddf = ym_survey_wall_merge.ddf() + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + + # -- + assert df.product_id.nunique() == 1 + assert df.team_id.nunique() == 1 + assert df.source.nunique() > 1 + + started_min_ts = df.started.min() + started_max_ts = df.started.max() + + assert type(started_min_ts) is pd.Timestamp + assert type(started_max_ts) is pd.Timestamp + + started_min: datetime = datetime.fromisoformat(str(started_min_ts)) + started_max: datetime = datetime.fromisoformat(str(started_max_ts)) + + started_delta = started_max - started_min + assert started_delta >= timedelta(days=3) |
