diff options
Diffstat (limited to 'tests/incite/mergers/foundations')
5 files changed, 523 insertions, 0 deletions
diff --git a/tests/incite/mergers/foundations/__init__.py b/tests/incite/mergers/foundations/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/mergers/foundations/__init__.py diff --git a/tests/incite/mergers/foundations/test_enriched_session.py b/tests/incite/mergers/foundations/test_enriched_session.py new file mode 100644 index 0000000..ec15d38 --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_session.py @@ -0,0 +1,138 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product +from typing import Optional + +from generalresearch.incite.schemas.admin_responses import ( + AdminPOPSessionSchema, +) + +import dask.dataframe as dd +import pandas as pd +import pytest + +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=5)], + ) + ), +) +class TestEnrichedSession: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_session_merge, + thl_web_rr, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=session_collection) + + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_session_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_session_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + # -- Teardown + delete_df_collection(session_collection) + + +class TestEnrichedSessionAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_to_admin_response( + self, + event_report_request, + enriched_session_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + session_report_request, + user_factory, + start, + session_factory, + product_factory, + delete_df_collection, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=1, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_session_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + + df = enriched_session_merge.to_admin_response( + rr=session_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert isinstance(AdminPOPSessionSchema.validate(df), pd.DataFrame) + assert df.index.get_level_values(1).nunique() == 2 diff --git a/tests/incite/mergers/foundations/test_enriched_task_adjust.py b/tests/incite/mergers/foundations/test_enriched_task_adjust.py new file mode 100644 index 0000000..96c214f --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_task_adjust.py @@ -0,0 +1,76 @@ +from datetime import timedelta +from itertools import product as iter_product + +import dask.dataframe as dd +import pandas as pd +import pytest + +from test_utils.incite.collections.conftest import ( + wall_collection, + task_adj_collection, + session_collection, +) +from test_utils.incite.mergers.conftest import enriched_wall_merge + + +@pytest.mark.parametrize( + argnames="offset, duration,", + argvalues=list( + iter_product( + ["12h", "3D"], + [timedelta(days=5)], + ) + ), +) +class TestEnrichedTaskAdjust: + + @pytest.mark.skip + def test_base( + self, + client_no_amm, + user_factory, + product, + task_adj_collection, + wall_collection, + session_collection, + enriched_wall_merge, + enriched_task_adjust_merge, + incite_item_factory, + delete_df_collection, + thl_web_rr, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + u1: User = user_factory(product=product) + + for item in session_collection.items: + incite_item_factory(user=u1, item=item) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + + enriched_task_adjust_merge.build( + client=client_no_amm, + task_adjust_coll=task_adj_collection, + enriched_wall=enriched_wall_merge, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_task_adjust_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty diff --git a/tests/incite/mergers/foundations/test_enriched_wall.py b/tests/incite/mergers/foundations/test_enriched_wall.py new file mode 100644 index 0000000..8f4995b --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_wall.py @@ -0,0 +1,236 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product as iter_product +from typing import Optional + +import dask.dataframe as dd +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMergeItem, +) +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) +from test_utils.incite.conftest import incite_item_factory +from test_utils.incite.mergers.conftest import ( + enriched_wall_merge, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list(iter_product(["48h", "3D"], [timedelta(days=5)])), +) +class TestEnrichedWall: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + thl_web_rr, + session_collection, + enriched_wall_merge, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + delete_df_collection(coll=wall_collection) + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_wall_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + def test_base_item( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_wall_merge, + delete_df_collection, + thl_web_rr, + incite_item_factory, + ): + # -- Build & Setup + delete_df_collection(coll=session_collection) + u = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + for item in enriched_wall_merge.items: + assert isinstance(item, EnrichedWallMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + item.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + modified_time2 = path.stat().st_mtime + + # Merger Items can't be updated unless it's a partial, confirm + # that even after attempting to rebuild, it doesn't re-touch + # the file + assert modified_time2 == modified_time1 + + # def test_admin_pop_session_device_type(ew_merge_setup): + # self.build() + # + # rr = ReportRequest( + # report_type=ReportType.POP_EVENT, + # index0="started", + # index1="device_type", + # freq="min", + # start=start, + # ) + # + # df, categories, updated = self.instance.to_admin_response( + # rr=rr, product_ids=[self.product.id], client=client + # ) + # + # assert isinstance(df, pd.DataFrame) + # device_types_str = [str(e.value) for e in DeviceType] + # device_types = df.index.get_level_values(1).values + # assert all([dt in device_types_str for dt in device_types]) + + +class TestEnrichedWallToAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_empty(self, enriched_wall_merge, client_no_amm, start): + from generalresearch.models.admin.request import ReportRequest + + rr = ReportRequest.model_validate({"interval": "5min", "start": start}) + + res = enriched_wall_merge.to_admin_response( + rr=rr, + client=client_no_amm, + ) + + assert isinstance(res, pd.DataFrame) + + assert res.empty + assert len(res.columns) > 5 + + def test_to_admin_response( + self, + event_report_request, + enriched_wall_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + user, + session_factory, + delete_df_collection, + product_factory, + user_factory, + start, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=2, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + df = enriched_wall_merge.to_admin_response( + rr=event_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + # assert len(df) == 1 + # assert user.product_id == df.reset_index().loc[0, "index1"] + assert df.index.get_level_values(1).nunique() == 2 diff --git a/tests/incite/mergers/foundations/test_user_id_product.py b/tests/incite/mergers/foundations/test_user_id_product.py new file mode 100644 index 0000000..f96bfb4 --- /dev/null +++ b/tests/incite/mergers/foundations/test_user_id_product.py @@ -0,0 +1,73 @@ +from datetime import timedelta, datetime, timezone +from itertools import product + +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.user_id_product import ( + UserIdProductMergeItem, +) +from test_utils.incite.mergers.conftest import user_id_product_merge + + +@pytest.mark.parametrize( + argnames="offset, duration, start", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=5)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestUserIDProduct: + + @pytest.mark.skip + def test_base(self, client_no_amm, user_id_product_merge): + ddf = user_id_product_merge.ddf() + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + assert not df.empty + + @pytest.mark.skip + def test_base_item(self, client_no_amm, user_id_product_merge, user_collection): + assert len(user_id_product_merge.items) == 1 + + for item in user_id_product_merge.items: + assert isinstance(item, UserIdProductMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + user_id_product_merge.build(client=client_no_amm, user_coll=user_collection) + modified_time2 = path.stat().st_mtime + + assert modified_time2 > modified_time1 + + @pytest.mark.skip + def test_read(self, client_no_amm, user_id_product_merge): + users_ddf = user_id_product_merge.ddf() + df = client_no_amm.compute(collections=users_ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert len(df.columns) == 1 + assert str(df.product_id.dtype) == "category" |
