diff options
| author | Max Nanis | 2026-03-06 16:49:46 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-03-06 16:49:46 -0500 |
| commit | 91d040211a4ed6e4157896256a762d3854777b5e (patch) | |
| tree | cd95922ea4257dc8d3f4e4cbe8534474709a20dc /tests/incite/mergers/foundations/test_enriched_wall.py | |
| download | generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip | |
Initial commitv3.3.4
Diffstat (limited to 'tests/incite/mergers/foundations/test_enriched_wall.py')
| -rw-r--r-- | tests/incite/mergers/foundations/test_enriched_wall.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/tests/incite/mergers/foundations/test_enriched_wall.py b/tests/incite/mergers/foundations/test_enriched_wall.py new file mode 100644 index 0000000..8f4995b --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_wall.py @@ -0,0 +1,236 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product as iter_product +from typing import Optional + +import dask.dataframe as dd +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMergeItem, +) +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) +from test_utils.incite.conftest import incite_item_factory +from test_utils.incite.mergers.conftest import ( + enriched_wall_merge, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list(iter_product(["48h", "3D"], [timedelta(days=5)])), +) +class TestEnrichedWall: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + thl_web_rr, + session_collection, + enriched_wall_merge, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + delete_df_collection(coll=wall_collection) + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_wall_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + def test_base_item( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_wall_merge, + delete_df_collection, + thl_web_rr, + incite_item_factory, + ): + # -- Build & Setup + delete_df_collection(coll=session_collection) + u = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + for item in enriched_wall_merge.items: + assert isinstance(item, EnrichedWallMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + item.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + modified_time2 = path.stat().st_mtime + + # Merger Items can't be updated unless it's a partial, confirm + # that even after attempting to rebuild, it doesn't re-touch + # the file + assert modified_time2 == modified_time1 + + # def test_admin_pop_session_device_type(ew_merge_setup): + # self.build() + # + # rr = ReportRequest( + # report_type=ReportType.POP_EVENT, + # index0="started", + # index1="device_type", + # freq="min", + # start=start, + # ) + # + # df, categories, updated = self.instance.to_admin_response( + # rr=rr, product_ids=[self.product.id], client=client + # ) + # + # assert isinstance(df, pd.DataFrame) + # device_types_str = [str(e.value) for e in DeviceType] + # device_types = df.index.get_level_values(1).values + # assert all([dt in device_types_str for dt in device_types]) + + +class TestEnrichedWallToAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_empty(self, enriched_wall_merge, client_no_amm, start): + from generalresearch.models.admin.request import ReportRequest + + rr = ReportRequest.model_validate({"interval": "5min", "start": start}) + + res = enriched_wall_merge.to_admin_response( + rr=rr, + client=client_no_amm, + ) + + assert isinstance(res, pd.DataFrame) + + assert res.empty + assert len(res.columns) > 5 + + def test_to_admin_response( + self, + event_report_request, + enriched_wall_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + user, + session_factory, + delete_df_collection, + product_factory, + user_factory, + start, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=2, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + df = enriched_wall_merge.to_admin_response( + rr=event_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + # assert len(df) == 1 + # assert user.product_id == df.reset_index().loc[0, "index1"] + assert df.index.get_level_values(1).nunique() == 2 |
