aboutsummaryrefslogtreecommitdiff
path: root/tests/incite/mergers
diff options
context:
space:
mode:
Diffstat (limited to 'tests/incite/mergers')
-rw-r--r--tests/incite/mergers/__init__.py0
-rw-r--r--tests/incite/mergers/foundations/__init__.py0
-rw-r--r--tests/incite/mergers/foundations/test_enriched_session.py138
-rw-r--r--tests/incite/mergers/foundations/test_enriched_task_adjust.py76
-rw-r--r--tests/incite/mergers/foundations/test_enriched_wall.py236
-rw-r--r--tests/incite/mergers/foundations/test_user_id_product.py73
-rw-r--r--tests/incite/mergers/test_merge_collection.py102
-rw-r--r--tests/incite/mergers/test_merge_collection_item.py66
-rw-r--r--tests/incite/mergers/test_pop_ledger.py307
-rw-r--r--tests/incite/mergers/test_ym_survey_merge.py125
10 files changed, 1123 insertions, 0 deletions
diff --git a/tests/incite/mergers/__init__.py b/tests/incite/mergers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/incite/mergers/__init__.py
diff --git a/tests/incite/mergers/foundations/__init__.py b/tests/incite/mergers/foundations/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/incite/mergers/foundations/__init__.py
diff --git a/tests/incite/mergers/foundations/test_enriched_session.py b/tests/incite/mergers/foundations/test_enriched_session.py
new file mode 100644
index 0000000..ec15d38
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_enriched_session.py
@@ -0,0 +1,138 @@
+from datetime import timedelta, timezone, datetime
+from decimal import Decimal
+from itertools import product
+from typing import Optional
+
+from generalresearch.incite.schemas.admin_responses import (
+ AdminPOPSessionSchema,
+)
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+
+from test_utils.incite.collections.conftest import (
+ session_collection,
+ wall_collection,
+)
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(
+ product(
+ ["12h", "3D"],
+ [timedelta(days=5)],
+ )
+ ),
+)
+class TestEnrichedSession:
+
+ def test_base(
+ self,
+ client_no_amm,
+ product,
+ user_factory,
+ wall_collection,
+ session_collection,
+ enriched_session_merge,
+ thl_web_rr,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ delete_df_collection(coll=session_collection)
+
+ u1: User = user_factory(product=product, created=session_collection.start)
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=u1)
+ item.initial_load()
+
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_session_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ ddf = enriched_session_merge.ddf()
+ assert isinstance(ddf, dd.DataFrame)
+
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+
+ assert not df.empty
+
+ # -- Teardown
+ delete_df_collection(session_collection)
+
+
+class TestEnrichedSessionAdmin:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def offset(self) -> str:
+ return "1d"
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_to_admin_response(
+ self,
+ event_report_request,
+ enriched_session_merge,
+ client_no_amm,
+ wall_collection,
+ session_collection,
+ thl_web_rr,
+ session_report_request,
+ user_factory,
+ start,
+ session_factory,
+ product_factory,
+ delete_df_collection,
+ ):
+ delete_df_collection(coll=wall_collection)
+ delete_df_collection(coll=session_collection)
+
+ p1 = product_factory()
+ p2 = product_factory()
+
+ for p in [p1, p2]:
+ u = user_factory(product=p)
+ for i in range(50):
+ s = session_factory(
+ user=u,
+ wall_count=1,
+ wall_req_cpi=Decimal("1.00"),
+ started=start + timedelta(minutes=i, seconds=1),
+ )
+ wall_collection.initial_load(client=None, sync=True)
+ session_collection.initial_load(client=None, sync=True)
+
+ enriched_session_merge.build(
+ client=client_no_amm,
+ session_coll=session_collection,
+ wall_coll=wall_collection,
+ pg_config=thl_web_rr,
+ )
+
+ df = enriched_session_merge.to_admin_response(
+ rr=session_report_request, client=client_no_amm
+ )
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+ assert isinstance(AdminPOPSessionSchema.validate(df), pd.DataFrame)
+ assert df.index.get_level_values(1).nunique() == 2
diff --git a/tests/incite/mergers/foundations/test_enriched_task_adjust.py b/tests/incite/mergers/foundations/test_enriched_task_adjust.py
new file mode 100644
index 0000000..96c214f
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_enriched_task_adjust.py
@@ -0,0 +1,76 @@
+from datetime import timedelta
+from itertools import product as iter_product
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+
+from test_utils.incite.collections.conftest import (
+ wall_collection,
+ task_adj_collection,
+ session_collection,
+)
+from test_utils.incite.mergers.conftest import enriched_wall_merge
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration,",
+ argvalues=list(
+ iter_product(
+ ["12h", "3D"],
+ [timedelta(days=5)],
+ )
+ ),
+)
+class TestEnrichedTaskAdjust:
+
+ @pytest.mark.skip
+ def test_base(
+ self,
+ client_no_amm,
+ user_factory,
+ product,
+ task_adj_collection,
+ wall_collection,
+ session_collection,
+ enriched_wall_merge,
+ enriched_task_adjust_merge,
+ incite_item_factory,
+ delete_df_collection,
+ thl_web_rr,
+ ):
+ from generalresearch.models.thl.user import User
+
+ # -- Build & Setup
+ delete_df_collection(coll=session_collection)
+ u1: User = user_factory(product=product)
+
+ for item in session_collection.items:
+ incite_item_factory(user=u1, item=item)
+ item.initial_load()
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ session_coll=session_collection,
+ wall_coll=wall_collection,
+ pg_config=thl_web_rr,
+ )
+
+ enriched_task_adjust_merge.build(
+ client=client_no_amm,
+ task_adjust_coll=task_adj_collection,
+ enriched_wall=enriched_wall_merge,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ ddf = enriched_task_adjust_merge.ddf()
+ assert isinstance(ddf, dd.DataFrame)
+
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+
+ assert not df.empty
diff --git a/tests/incite/mergers/foundations/test_enriched_wall.py b/tests/incite/mergers/foundations/test_enriched_wall.py
new file mode 100644
index 0000000..8f4995b
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_enriched_wall.py
@@ -0,0 +1,236 @@
+from datetime import timedelta, timezone, datetime
+from decimal import Decimal
+from itertools import product as iter_product
+from typing import Optional
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+
+from generalresearch.incite.mergers.foundations.enriched_wall import (
+ EnrichedWallMergeItem,
+)
+from test_utils.incite.collections.conftest import (
+ session_collection,
+ wall_collection,
+)
+from test_utils.incite.conftest import incite_item_factory
+from test_utils.incite.mergers.conftest import (
+ enriched_wall_merge,
+)
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(iter_product(["48h", "3D"], [timedelta(days=5)])),
+)
+class TestEnrichedWall:
+
+ def test_base(
+ self,
+ client_no_amm,
+ product,
+ user_factory,
+ wall_collection,
+ thl_web_rr,
+ session_collection,
+ enriched_wall_merge,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ # -- Build & Setup
+ delete_df_collection(coll=session_collection)
+ delete_df_collection(coll=wall_collection)
+ u1: User = user_factory(product=product, created=session_collection.start)
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=u1)
+ item.initial_load()
+
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ ddf = enriched_wall_merge.ddf()
+ assert isinstance(ddf, dd.DataFrame)
+
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+
+ assert not df.empty
+
+ def test_base_item(
+ self,
+ client_no_amm,
+ product,
+ user_factory,
+ wall_collection,
+ session_collection,
+ enriched_wall_merge,
+ delete_df_collection,
+ thl_web_rr,
+ incite_item_factory,
+ ):
+ # -- Build & Setup
+ delete_df_collection(coll=session_collection)
+ u = user_factory(product=product, created=session_collection.start)
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load()
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ for item in enriched_wall_merge.items:
+ assert isinstance(item, EnrichedWallMergeItem)
+
+ path = item.path
+
+ try:
+ modified_time1 = path.stat().st_mtime
+ except (Exception,):
+ modified_time1 = 0
+
+ item.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+ modified_time2 = path.stat().st_mtime
+
+ # Merger Items can't be updated unless it's a partial, confirm
+ # that even after attempting to rebuild, it doesn't re-touch
+ # the file
+ assert modified_time2 == modified_time1
+
+ # def test_admin_pop_session_device_type(ew_merge_setup):
+ # self.build()
+ #
+ # rr = ReportRequest(
+ # report_type=ReportType.POP_EVENT,
+ # index0="started",
+ # index1="device_type",
+ # freq="min",
+ # start=start,
+ # )
+ #
+ # df, categories, updated = self.instance.to_admin_response(
+ # rr=rr, product_ids=[self.product.id], client=client
+ # )
+ #
+ # assert isinstance(df, pd.DataFrame)
+ # device_types_str = [str(e.value) for e in DeviceType]
+ # device_types = df.index.get_level_values(1).values
+ # assert all([dt in device_types_str for dt in device_types])
+
+
+class TestEnrichedWallToAdmin:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def offset(self) -> str:
+ return "1d"
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_empty(self, enriched_wall_merge, client_no_amm, start):
+ from generalresearch.models.admin.request import ReportRequest
+
+ rr = ReportRequest.model_validate({"interval": "5min", "start": start})
+
+ res = enriched_wall_merge.to_admin_response(
+ rr=rr,
+ client=client_no_amm,
+ )
+
+ assert isinstance(res, pd.DataFrame)
+
+ assert res.empty
+ assert len(res.columns) > 5
+
+ def test_to_admin_response(
+ self,
+ event_report_request,
+ enriched_wall_merge,
+ client_no_amm,
+ wall_collection,
+ session_collection,
+ thl_web_rr,
+ user,
+ session_factory,
+ delete_df_collection,
+ product_factory,
+ user_factory,
+ start,
+ ):
+ delete_df_collection(coll=wall_collection)
+ delete_df_collection(coll=session_collection)
+
+ p1 = product_factory()
+ p2 = product_factory()
+
+ for p in [p1, p2]:
+ u = user_factory(product=p)
+ for i in range(50):
+ s = session_factory(
+ user=u,
+ wall_count=2,
+ wall_req_cpi=Decimal("1.00"),
+ started=start + timedelta(minutes=i, seconds=1),
+ )
+
+ wall_collection.initial_load(client=None, sync=True)
+ session_collection.initial_load(client=None, sync=True)
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ df = enriched_wall_merge.to_admin_response(
+ rr=event_report_request, client=client_no_amm
+ )
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+ # assert len(df) == 1
+ # assert user.product_id == df.reset_index().loc[0, "index1"]
+ assert df.index.get_level_values(1).nunique() == 2
diff --git a/tests/incite/mergers/foundations/test_user_id_product.py b/tests/incite/mergers/foundations/test_user_id_product.py
new file mode 100644
index 0000000..f96bfb4
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_user_id_product.py
@@ -0,0 +1,73 @@
+from datetime import timedelta, datetime, timezone
+from itertools import product
+
+import pandas as pd
+import pytest
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+
+from generalresearch.incite.mergers.foundations.user_id_product import (
+ UserIdProductMergeItem,
+)
+from test_utils.incite.mergers.conftest import user_id_product_merge
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration, start",
+ argvalues=list(
+ product(
+ ["12h", "3D"],
+ [timedelta(days=5)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestUserIDProduct:
+
+ @pytest.mark.skip
+ def test_base(self, client_no_amm, user_id_product_merge):
+ ddf = user_id_product_merge.ddf()
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ @pytest.mark.skip
+ def test_base_item(self, client_no_amm, user_id_product_merge, user_collection):
+ assert len(user_id_product_merge.items) == 1
+
+ for item in user_id_product_merge.items:
+ assert isinstance(item, UserIdProductMergeItem)
+
+ path = item.path
+
+ try:
+ modified_time1 = path.stat().st_mtime
+ except (Exception,):
+ modified_time1 = 0
+
+ user_id_product_merge.build(client=client_no_amm, user_coll=user_collection)
+ modified_time2 = path.stat().st_mtime
+
+ assert modified_time2 > modified_time1
+
+ @pytest.mark.skip
+ def test_read(self, client_no_amm, user_id_product_merge):
+ users_ddf = user_id_product_merge.ddf()
+ df = client_no_amm.compute(collections=users_ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert len(df.columns) == 1
+ assert str(df.product_id.dtype) == "category"
diff --git a/tests/incite/mergers/test_merge_collection.py b/tests/incite/mergers/test_merge_collection.py
new file mode 100644
index 0000000..692cac3
--- /dev/null
+++ b/tests/incite/mergers/test_merge_collection.py
@@ -0,0 +1,102 @@
+from datetime import datetime, timezone, timedelta
+from itertools import product
+
+import pandas as pd
+import pytest
+from pandera import DataFrameSchema
+
+from generalresearch.incite.mergers import (
+ MergeCollection,
+ MergeType,
+)
+from test_utils.incite.conftest import mnt_filepath
+
+merge_types = list(e for e in MergeType if e != MergeType.TEST)
+
+
+@pytest.mark.parametrize(
+ argnames="merge_type, offset, duration, start",
+ argvalues=list(
+ product(
+ merge_types,
+ ["5min", "6h", "14D"],
+ [timedelta(days=30)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestMergeCollection:
+
+ def test_init(self, mnt_filepath, merge_type, offset, duration, start):
+ with pytest.raises(expected_exception=ValueError) as cm:
+ MergeCollection(archive_path=mnt_filepath.data_src)
+ assert "Must explicitly provide a merge_type" in str(cm.value)
+
+ instance = MergeCollection(
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+ assert instance.merge_type == merge_type
+
+ def test_items(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ offset=offset,
+ start=start,
+ finished=start + duration,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert len(instance.interval_range) == len(instance.items)
+
+ def test_progress(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ offset=offset,
+ start=start,
+ finished=start + duration,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert isinstance(instance.progress, pd.DataFrame)
+ assert instance.progress.shape[0] > 0
+ assert instance.progress.shape[1] == 7
+ assert instance.progress["group_by"].isnull().all()
+
+ def test_schema(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert isinstance(instance._schema, DataFrameSchema)
+
+ def test_load(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ # Confirm that there are no archives available yet
+ assert instance.progress.has_archive.eq(False).all()
+
+ def test_get_items(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ # with pytest.raises(expected_exception=ResourceWarning) as cm:
+ res = instance.get_items_last365()
+ # assert "has missing archives", str(cm.value)
+ assert len(res) == len(instance.items)
diff --git a/tests/incite/mergers/test_merge_collection_item.py b/tests/incite/mergers/test_merge_collection_item.py
new file mode 100644
index 0000000..96f8789
--- /dev/null
+++ b/tests/incite/mergers/test_merge_collection_item.py
@@ -0,0 +1,66 @@
+from datetime import datetime, timezone, timedelta
+from itertools import product
+from pathlib import PurePath
+
+import pytest
+
+from generalresearch.incite.mergers import MergeCollectionItem, MergeType
+from generalresearch.incite.mergers.foundations.enriched_session import (
+ EnrichedSessionMerge,
+)
+from generalresearch.incite.mergers.foundations.enriched_wall import (
+ EnrichedWallMerge,
+)
+from test_utils.incite.mergers.conftest import merge_collection
+
+
+@pytest.mark.parametrize(
+ argnames="merge_type, offset, duration",
+ argvalues=list(
+ product(
+ [MergeType.ENRICHED_SESSION, MergeType.ENRICHED_WALL],
+ ["1h"],
+ [timedelta(days=1)],
+ )
+ ),
+)
+class TestMergeCollectionItem:
+
+ def test_file_naming(self, merge_collection, offset, duration, start):
+ assert len(merge_collection.items) == 25
+
+ items: list[MergeCollectionItem] = merge_collection.items
+
+ for i in items:
+ i: MergeCollectionItem
+
+ assert isinstance(i.path, PurePath)
+ assert i.path.name == i.filename
+
+ assert i._collection.merge_type.name.lower() in i.filename
+ assert i._collection.offset in i.filename
+ assert i.start.strftime("%Y-%m-%d-%H-%M-%S") in i.filename
+
+ def test_archives(self, merge_collection, offset, duration, start):
+ assert len(merge_collection.items) == 25
+
+ for i in merge_collection.items:
+ assert not i.has_archive()
+ assert not i.has_empty()
+ assert not i.is_empty()
+ assert not i.has_partial_archive()
+ assert i.has_archive() == i.path_exists(generic_path=i.path)
+
+ res = set([i.should_archive() for i in merge_collection.items])
+ assert len(res) == 1
+
+ def test_item_to_archive(self, merge_collection, offset, duration, start):
+ for item in merge_collection.items:
+ item: MergeCollectionItem
+ assert not item.has_archive()
+
+ # TODO: setup build methods
+ # ddf = self.build
+ # saved = instance.to_archive(ddf=ddf)
+ # self.assertTrue(saved)
+ # self.assertTrue(instance.has_archive())
diff --git a/tests/incite/mergers/test_pop_ledger.py b/tests/incite/mergers/test_pop_ledger.py
new file mode 100644
index 0000000..6f96108
--- /dev/null
+++ b/tests/incite/mergers/test_pop_ledger.py
@@ -0,0 +1,307 @@
+from datetime import timedelta, datetime, timezone
+from itertools import product as iter_product
+from typing import Optional
+
+import pandas as pd
+import pytest
+from distributed.utils_test import client_no_amm
+
+from generalresearch.incite.schemas.mergers.pop_ledger import (
+ numerical_col_names,
+)
+from test_utils.incite.collections.conftest import ledger_collection
+from test_utils.incite.conftest import mnt_filepath, incite_item_factory
+from test_utils.incite.mergers.conftest import pop_ledger_merge
+from test_utils.managers.ledger.conftest import create_main_accounts
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(
+ iter_product(
+ ["12h", "3D"],
+ [timedelta(days=4)],
+ )
+ ),
+)
+class TestMergePOPLedger:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_base(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ product,
+ user_factory,
+ create_main_accounts,
+ thl_lm,
+ delete_df_collection,
+ incite_item_factory,
+ delete_ledger_db,
+ ):
+ from generalresearch.models.thl.ledger import LedgerAccount
+
+ u = user_factory(product=product, created=ledger_collection.start)
+
+ # -- Build & Setup
+ delete_ledger_db()
+ create_main_accounts()
+ delete_df_collection(coll=ledger_collection)
+ # assert ledger_collection.start is None
+ # assert ledger_collection.offset is None
+
+ for item in ledger_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load()
+
+ # Confirm any of the items are archived
+ assert ledger_collection.progress.has_archive.eq(True).all()
+
+ pop_ledger_merge.build(
+ client=client_no_amm,
+ ledger_coll=ledger_collection,
+ )
+ # assert pop_ledger_merge.progress.has_archive.eq(True).all()
+
+ ddf = pop_ledger_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+
+ user_wallet_account: LedgerAccount = thl_lm.get_account_or_create_user_wallet(
+ user=u
+ )
+ cash_account: LedgerAccount = thl_lm.get_account_cash()
+ rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue()
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ # Pure SQL based lookups
+ cash_balance: int = thl_lm.get_account_balance(account=cash_account)
+ rev_balance: int = thl_lm.get_account_balance(account=rev_account)
+ assert cash_balance > rev_balance
+
+ # (1) Test Cash Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names,
+ filters=[
+ ("account_id", "==", cash_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+ assert df["mp_payment.CREDIT"].sum() == 0
+ assert cash_balance > 0
+ assert df["mp_payment.DEBIT"].sum() == cash_balance
+
+ # (2) Test Revenue Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names,
+ filters=[
+ ("account_id", "==", rev_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert rev_balance == 0
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == 0
+ assert df["mp_payment.CREDIT"].sum() > 0
+
+ # -- Cleanup
+ delete_ledger_db()
+
+ def test_pydantic_init(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ mnt_filepath,
+ product,
+ user_factory,
+ create_main_accounts,
+ offset,
+ duration,
+ start,
+ thl_lm,
+ incite_item_factory,
+ delete_df_collection,
+ delete_ledger_db,
+ session_collection,
+ ):
+ from generalresearch.models.thl.ledger import LedgerAccount
+ from generalresearch.models.thl.product import Product
+ from generalresearch.models.thl.finance import ProductBalances
+
+ u = user_factory(product=product, created=session_collection.start)
+
+ assert ledger_collection.finished is not None
+ assert isinstance(u.product, Product)
+ delete_ledger_db()
+ create_main_accounts(),
+ delete_df_collection(coll=ledger_collection)
+
+ bp_account: LedgerAccount = thl_lm.get_account_or_create_bp_wallet(
+ product=u.product
+ )
+ cash_account: LedgerAccount = thl_lm.get_account_cash()
+ rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue()
+
+ for item in ledger_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load(overwrite=True)
+
+ pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection)
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ # (1) Filter by the Product Account, this means no cash_account, or
+ # rev_account transactions will be present in here...
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", bp_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+ df = df.set_index("time_idx")
+ assert not df.empty
+
+ instance = ProductBalances.from_pandas(input_data=df.sum())
+ assert instance.payout == instance.net == instance.bp_payment_credit
+ assert instance.available_balance < instance.net
+ assert instance.available_balance + instance.retainer == instance.net
+ assert instance.balance == thl_lm.get_account_balance(bp_account)
+ assert df["bp_payment.CREDIT"].sum() == thl_lm.get_account_balance(bp_account)
+
+ # (2) Filter by the Cash Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", cash_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ cash_balance: int = thl_lm.get_account_balance(account=cash_account)
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert cash_balance > 0
+ assert df["mp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == cash_balance
+
+ # (2) Filter by the Revenue Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", rev_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ rev_balance: int = thl_lm.get_account_balance(account=rev_account)
+ assert rev_balance == 0
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == 0
+ assert df["mp_payment.CREDIT"].sum() > 0
+
+ def test_resample(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ mnt_filepath,
+ user_factory,
+ product,
+ create_main_accounts,
+ offset,
+ duration,
+ start,
+ thl_lm,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ assert ledger_collection.finished is not None
+ delete_df_collection(coll=ledger_collection)
+ u1: User = user_factory(product=product)
+
+ bp_account = thl_lm.get_account_or_create_bp_wallet(product=u1.product)
+
+ for item in ledger_collection.items:
+ incite_item_factory(user=u1, item=item)
+ item.initial_load(overwrite=True)
+
+ pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection)
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", bp_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+ assert isinstance(df.index, pd.Index)
+ assert not isinstance(df.index, pd.RangeIndex)
+
+ # Now change the index so we can easily resample it
+ df = df.set_index("time_idx")
+ assert isinstance(df.index, pd.Index)
+ assert isinstance(df.index, pd.DatetimeIndex)
+
+ bp_account_balance = thl_lm.get_account_balance(account=bp_account)
+
+ # Initial sum
+ initial_sum = df.sum().sum()
+ # assert len(df) == 48 # msg="Original df should be 48 rows"
+
+ # Original (1min) to 5min
+ df_5min = df.resample("5min").sum()
+ # assert len(df_5min) == 12
+ assert initial_sum == df_5min.sum().sum()
+
+ # 30min
+ df_30min = df.resample("30min").sum()
+ # assert len(df_30min) == 2
+ assert initial_sum == df_30min.sum().sum()
+
+ # 1hr
+ df_1hr = df.resample("1h").sum()
+ # assert len(df_1hr) == 1
+ assert initial_sum == df_1hr.sum().sum()
+
+ # 1 day
+ df_1day = df.resample("1d").sum()
+ # assert len(df_1day) == 1
+ assert initial_sum == df_1day.sum().sum()
diff --git a/tests/incite/mergers/test_ym_survey_merge.py b/tests/incite/mergers/test_ym_survey_merge.py
new file mode 100644
index 0000000..4c2df6b
--- /dev/null
+++ b/tests/incite/mergers/test_ym_survey_merge.py
@@ -0,0 +1,125 @@
+from datetime import timedelta, timezone, datetime
+from itertools import product
+
+import pandas as pd
+import pytest
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+
+from test_utils.incite.collections.conftest import wall_collection, session_collection
+from test_utils.incite.mergers.conftest import (
+ enriched_session_merge,
+ ym_survey_wall_merge,
+)
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration, start",
+ argvalues=list(
+ product(
+ ["12h", "3D"],
+ [timedelta(days=30)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestYMSurveyMerge:
+ """We override start, not because it's needed on the YMSurveyWall merge,
+ which operates on a rolling 10-day window, but because we don't want
+ to mock data in the wall collection and enriched_session_merge from
+ the 1800s and then wonder why there is no data available in the past
+ 10 days in the database.
+ """
+
+ def test_base(
+ self,
+ client_no_amm,
+ user_factory,
+ product,
+ ym_survey_wall_merge,
+ wall_collection,
+ session_collection,
+ enriched_session_merge,
+ delete_df_collection,
+ incite_item_factory,
+ thl_web_rr,
+ ):
+ from generalresearch.models.thl.user import User
+
+ delete_df_collection(coll=session_collection)
+ user: User = user_factory(product=product, created=session_collection.start)
+
+ # -- Build & Setup
+ assert ym_survey_wall_merge.start is None
+ assert ym_survey_wall_merge.offset == "10D"
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=user)
+ item.initial_load()
+ for item in wall_collection.items:
+ item.initial_load()
+
+ # Confirm any of the items are archived
+ assert session_collection.progress.has_archive.eq(True).all()
+ assert wall_collection.progress.has_archive.eq(True).all()
+
+ enriched_session_merge.build(
+ client=client_no_amm,
+ session_coll=session_collection,
+ wall_coll=wall_collection,
+ pg_config=thl_web_rr,
+ )
+ assert enriched_session_merge.progress.has_archive.eq(True).all()
+
+ ddf = enriched_session_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+
+ ym_survey_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ enriched_session=enriched_session_merge,
+ )
+ assert ym_survey_wall_merge.progress.has_archive.eq(True).all()
+
+ # --
+
+ ddf = ym_survey_wall_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+ assert df.product_id.nunique() == 1
+ assert df.team_id.nunique() == 1
+ assert df.source.nunique() > 1
+
+ started_min_ts = df.started.min()
+ started_max_ts = df.started.max()
+
+ assert type(started_min_ts) is pd.Timestamp
+ assert type(started_max_ts) is pd.Timestamp
+
+ started_min: datetime = datetime.fromisoformat(str(started_min_ts))
+ started_max: datetime = datetime.fromisoformat(str(started_max_ts))
+
+ started_delta = started_max - started_min
+ assert started_delta >= timedelta(days=3)