diff options
| author | Max Nanis | 2026-03-06 16:49:46 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-03-06 16:49:46 -0500 |
| commit | 91d040211a4ed6e4157896256a762d3854777b5e (patch) | |
| tree | cd95922ea4257dc8d3f4e4cbe8534474709a20dc /tests/incite | |
| download | generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip | |
Initial commitv3.3.4
Diffstat (limited to 'tests/incite')
25 files changed, 3602 insertions, 0 deletions
diff --git a/tests/incite/__init__.py b/tests/incite/__init__.py new file mode 100644 index 0000000..2f736e8 --- /dev/null +++ b/tests/incite/__init__.py @@ -0,0 +1,137 @@ +# class TestParquetBehaviors(CleanTempDirectoryTestCls): +# wall_coll = WallDFCollection( +# start=GLOBAL_VARS["wall"].start, +# offset="49h", +# archive_path=f"{settings.incite_mount_dir}/raw/df-collections/{DFCollectionType.WALL.value}", +# ) +# +# def test_filters(self): +# # Using REAL data here +# start = datetime(year=2024, month=1, day=15, hour=12, tzinfo=timezone.utc) +# end = datetime(year=2024, month=1, day=15, hour=20, tzinfo=timezone.utc) +# end_max = datetime( +# year=2024, month=1, day=15, hour=20, tzinfo=timezone.utc +# ) + timedelta(hours=2) +# +# ir = pd.Interval(left=pd.Timestamp(start), right=pd.Timestamp(end)) +# wall_items = [w for w in self.wall_coll.items if w.interval.overlaps(ir)] +# ddf = self.wall_coll.ddf( +# items=wall_items, +# include_partial=True, +# force_rr_latest=False, +# columns=["started", "finished"], +# filters=[ +# ("started", ">=", start), +# ("started", "<", end), +# ], +# ) +# +# df = ddf.compute() +# self.assertIsInstance(df, pd.DataFrame) +# +# # No started=None, and they're all between the started and the end +# self.assertFalse(df.started.isna().any()) +# self.assertFalse((df.started < start).any()) +# self.assertFalse((df.started > end).any()) +# +# # Has finished=None and finished=time, so +# # the finished is all between the started and +# # the end_max +# self.assertTrue(df.finished.isna().any()) +# self.assertTrue((df.finished.dt.year == 2024).any()) +# +# self.assertFalse((df.finished > end_max).any()) +# self.assertFalse((df.finished < start).any()) +# +# # def test_user_id_list(self): +# # # Calling compute turns it into a np.ndarray +# # user_ids = self.instance.ddf( +# # columns=["user_id"] +# # ).user_id.unique().values.compute() +# # self.assertIsInstance(user_ids, np.ndarray) +# # +# # # If ddf filters work with ndarray +# # user_product_merge = <todo: assign> +# # +# # with self.assertRaises(TypeError) as cm: +# # user_product_merge.ddf( +# # filters=[("id", "in", user_ids)]) +# # self.assertIn("Value of 'in' filter must be a list, set or tuple.", str(cm.exception)) +# # +# # # No compute == dask array +# # user_ids = self.instance.ddf( +# # columns=["user_id"] +# # ).user_id.unique().values +# # self.assertIsInstance(user_ids, da.Array) +# # +# # with self.assertRaises(TypeError) as cm: +# # user_product_merge.ddf( +# # filters=[("id", "in", user_ids)]) +# # self.assertIn("Value of 'in' filter must be a list, set or tuple.", str(cm.exception)) +# # +# # # pick a product_id (most active one) +# # self.product_id = instance.df.product_id.value_counts().index[0] +# # self.expected_columns: int = len(instance._schema.columns) +# # self.instance = instance +# +# # def test_basic(self): +# # # now try to load up the data! +# # self.instance.grouped_key = self.product_id +# # +# # # Confirm any of the items are archived +# # self.assertTrue(self.instance.progress.has_archive.eq(True).any()) +# # +# # # Confirm it returns a df +# # df = self.instance.dd().compute() +# # +# # self.assertFalse(df.empty) +# # self.assertEqual(df.shape[1], self.expected_columns) +# # self.assertGreater(df.shape[0], 1) +# # +# # # Confirm that DF only contains this product_id +# # self.assertEqual(df[df.product_id == self.product_id].shape, df.shape) +# +# # def test_god_vs_product_id(self): +# # self.instance.grouped_key = self.product_id +# # df_product_origin = self.instance.dd(columns=None, filters=None).compute() +# # +# # self.instance.grouped_key = None +# # df_god_origin = self.instance.dd(columns=None, +# # filters=[("product_id", "==", self.product_id)]).compute() +# # +# # self.assertTrue(df_god_origin.equals(df_product_origin)) +# +# # +# # instance = POPSessionMerge( +# # start=START, +# # archive_path=self.PATH, +# # group_by="product_id" +# # ) +# # instance.build(U=GLOBAL_VARS["user"], S=GLOBAL_VARS["session"], W=GLOBAL_VARS["wall"]) +# # instance.save(god_only=False) +# # +# # # pick a product_id (most active one) +# # self.product_id = instance.df.product_id.value_counts().index[0] +# # self.expected_columns: int = len(instance._schema.columns) +# # self.instance = instance +# +# +# class TestValidItem(CleanTempDirectoryTestCls): +# +# def test_interval(self): +# for k in GLOBAL_VARS.keys(): +# coll = GLOBAL_VARS[k] +# item = coll.items[0] +# ir = item.interval +# +# self.assertIsInstance(ir, pd.Interval) +# self.assertLess(a=ir.left, b=ir.right) +# +# def test_str(self): +# for k in GLOBAL_VARS.keys(): +# coll = GLOBAL_VARS[k] +# item = coll.items[0] +# +# offset = coll.offset or "–" +# +# self.assertIn(offset, str(item)) diff --git a/tests/incite/collections/__init__.py b/tests/incite/collections/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/collections/__init__.py diff --git a/tests/incite/collections/test_df_collection_base.py b/tests/incite/collections/test_df_collection_base.py new file mode 100644 index 0000000..5aaa729 --- /dev/null +++ b/tests/incite/collections/test_df_collection_base.py @@ -0,0 +1,113 @@ +from datetime import datetime, timezone + +import pandas as pd +import pytest +from pandera import DataFrameSchema + +from generalresearch.incite.collections import ( + DFCollectionType, + DFCollection, +) +from test_utils.incite.conftest import mnt_filepath + +df_collection_types = [e for e in DFCollectionType if e is not DFCollectionType.TEST] + + +@pytest.mark.parametrize("df_coll_type", df_collection_types) +class TestDFCollectionBase: + """None of these tests are about the DFCollection with any specific + data_type... that will be handled in other parameterized tests + + """ + + def test_init(self, mnt_filepath, df_coll_type): + """Try to initialize the DFCollection with various invalid parameters""" + with pytest.raises(expected_exception=ValueError) as cm: + DFCollection(archive_path=mnt_filepath.data_src) + assert "Must explicitly provide a data_type" in str(cm.value) + + # with pytest.raises(expected_exception=ValueError) as cm: + # DFCollection( + # data_type=DFCollectionType.TEST, archive_path=mnt_filepath.data_src + # ) + # assert "Must provide a supported data_type" in str(cm.value) + + instance = DFCollection( + data_type=DFCollectionType.WALL, archive_path=mnt_filepath.data_src + ) + assert instance.data_type == DFCollectionType.WALL + + +@pytest.mark.parametrize("df_coll_type", df_collection_types) +class TestDFCollectionBaseProperties: + + @pytest.mark.skip + def test_df_collection_items(self, mnt_filepath, df_coll_type): + instance = DFCollection( + data_type=df_coll_type, + start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc), + finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc), + offset="100d", + archive_path=mnt_filepath.archive_path(enum_type=df_coll_type), + ) + + assert len(instance.interval_range) == len(instance.items) + assert len(instance.items) == 366 + + def test_df_collection_progress(self, mnt_filepath, df_coll_type): + instance = DFCollection( + data_type=df_coll_type, + start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc), + finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc), + offset="100d", + archive_path=mnt_filepath.archive_path(enum_type=df_coll_type), + ) + + # Progress returns a dataframe with a row each Item + assert isinstance(instance.progress, pd.DataFrame) + assert instance.progress.shape == (366, 6) + + def test_df_collection_schema(self, mnt_filepath, df_coll_type): + instance1 = DFCollection( + data_type=DFCollectionType.WALL, archive_path=mnt_filepath.data_src + ) + + instance2 = DFCollection( + data_type=DFCollectionType.SESSION, archive_path=mnt_filepath.data_src + ) + + assert instance1._schema != instance2._schema + assert isinstance(instance1._schema, DataFrameSchema) + assert isinstance(instance2._schema, DataFrameSchema) + + +class TestDFCollectionBaseMethods: + + @pytest.mark.skip + def test_initial_load(self, mnt_filepath, thl_web_rr): + instance = DFCollection( + pg_config=thl_web_rr, + data_type=DFCollectionType.USER, + start=datetime(year=2022, month=1, day=1, minute=0, tzinfo=timezone.utc), + finished=datetime(year=2022, month=1, day=1, minute=5, tzinfo=timezone.utc), + offset="2min", + archive_path=mnt_filepath.data_src, + ) + + # Confirm that there are no archives available yet + assert instance.progress.has_archive.eq(False).all() + + instance.initial_load() + assert 47 == len(instance.ddf().index) + assert instance.progress.should_archive.eq(True).all() + + # A few archives should have been made + assert not instance.progress.has_archive.eq(False).all() + + @pytest.mark.skip + def test_fetch_force_rr_latest(self): + pass + + @pytest.mark.skip + def test_force_rr_latest(self): + pass diff --git a/tests/incite/collections/test_df_collection_item_base.py b/tests/incite/collections/test_df_collection_item_base.py new file mode 100644 index 0000000..a0c0b0b --- /dev/null +++ b/tests/incite/collections/test_df_collection_item_base.py @@ -0,0 +1,72 @@ +from datetime import datetime, timezone + +import pytest + +from generalresearch.incite.collections import ( + DFCollectionType, + DFCollectionItem, + DFCollection, +) +from test_utils.incite.conftest import mnt_filepath + +df_collection_types = [e for e in DFCollectionType if e is not DFCollectionType.TEST] + + +@pytest.mark.parametrize("df_coll_type", df_collection_types) +class TestDFCollectionItemBase: + + def test_init(self, mnt_filepath, df_coll_type): + collection = DFCollection( + data_type=df_coll_type, + offset="100d", + start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc), + finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc), + archive_path=mnt_filepath.archive_path(enum_type=df_coll_type), + ) + + item = DFCollectionItem() + item._collection = collection + + assert isinstance(item, DFCollectionItem) + + +@pytest.mark.parametrize("df_coll_type", df_collection_types) +class TestDFCollectionItemProperties: + + @pytest.mark.skip + def test_filename(self, df_coll_type): + pass + + +@pytest.mark.parametrize("df_coll_type", df_collection_types) +class TestDFCollectionItemMethods: + + def test_has_mysql_false(self, mnt_filepath, df_coll_type): + collection = DFCollection( + data_type=df_coll_type, + offset="100d", + start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc), + finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc), + archive_path=mnt_filepath.archive_path(enum_type=df_coll_type), + ) + + instance1: DFCollectionItem = collection.items[0] + assert not instance1.has_mysql() + + def test_has_mysql_true(self, thl_web_rr, mnt_filepath, df_coll_type): + collection = DFCollection( + data_type=df_coll_type, + offset="100d", + start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc), + finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc), + archive_path=mnt_filepath.archive_path(enum_type=df_coll_type), + pg_config=thl_web_rr, + ) + + # Has RR, assume unittest server is online + instance2: DFCollectionItem = collection.items[0] + assert instance2.has_mysql() + + @pytest.mark.skip + def test_update_partial_archive(self, df_coll_type): + pass diff --git a/tests/incite/collections/test_df_collection_item_thl_web.py b/tests/incite/collections/test_df_collection_item_thl_web.py new file mode 100644 index 0000000..9c3d67a --- /dev/null +++ b/tests/incite/collections/test_df_collection_item_thl_web.py @@ -0,0 +1,994 @@ +from datetime import datetime, timezone, timedelta +from itertools import product as iter_product +from os.path import join as pjoin +from pathlib import PurePath, Path +from uuid import uuid4 + +import dask.dataframe as dd +import pandas as pd +import pytest +from distributed import Client, Scheduler, Worker + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) +from faker import Faker +from pandera import DataFrameSchema +from pydantic import FilePath + +from generalresearch.incite.base import CollectionItemBase +from generalresearch.incite.collections import ( + DFCollectionItem, + DFCollectionType, +) +from generalresearch.incite.schemas import ARCHIVE_AFTER +from generalresearch.models.thl.user import User +from generalresearch.pg_helper import PostgresConfig +from generalresearch.sql_helper import PostgresDsn +from test_utils.incite.conftest import mnt_filepath, incite_item_factory + +fake = Faker() + +df_collections = [ + DFCollectionType.WALL, + DFCollectionType.SESSION, + DFCollectionType.LEDGER, + DFCollectionType.TASK_ADJUSTMENT, +] + +unsupported_mock_types = { + DFCollectionType.IP_INFO, + DFCollectionType.IP_HISTORY, + DFCollectionType.IP_HISTORY_WS, + DFCollectionType.TASK_ADJUSTMENT, +} + + +def combo_object(): + for x in iter_product( + df_collections, + ["15min", "45min", "1H"], + ): + yield x + + +class TestDFCollectionItemBase: + def test_init(self): + instance = CollectionItemBase() + assert isinstance(instance, CollectionItemBase) + assert isinstance(instance.start, datetime) + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollectionItemProperties: + + def test_filename(self, df_collection_data_type, df_collection, offset): + for i in df_collection.items: + assert isinstance(i.filename, str) + + assert isinstance(i.path, PurePath) + assert i.path.name == i.filename + + assert i._collection.data_type.name.lower() in i.filename + assert i._collection.offset in i.filename + assert i.start.strftime("%Y-%m-%d-%H-%M-%S") in i.filename + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollectionItemPropertiesBase: + + def test_name(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.name, str) + + def test_finish(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.finish, datetime) + + def test_interval(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.interval, pd.Interval) + + def test_partial_filename(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.partial_filename, str) + + def test_empty_filename(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.empty_filename, str) + + def test_path(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.path, FilePath) + + def test_partial_path(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.partial_path, FilePath) + + def test_empty_path(self, df_collection_data_type, offset, df_collection): + for i in df_collection.items: + assert isinstance(i.empty_path, FilePath) + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset, duration", + argvalues=list( + iter_product( + df_collections, + ["12h", "10D"], + [timedelta(days=10), timedelta(days=45)], + ) + ), +) +class TestDFCollectionItemMethod: + + def test_has_mysql( + self, + df_collection, + thl_web_rr, + offset, + duration, + df_collection_data_type, + delete_df_collection, + ): + delete_df_collection(coll=df_collection) + + df_collection.pg_config = None + for i in df_collection.items: + assert not i.has_mysql() + + # Confirm that the regular connection should work as expected + df_collection.pg_config = thl_web_rr + for i in df_collection.items: + assert i.has_mysql() + + # Make a fake connection and confirm it does NOT work + df_collection.pg_config = PostgresConfig( + dsn=PostgresDsn(f"postgres://root:@127.0.0.1/{uuid4().hex}"), + connect_timeout=5, + statement_timeout=1, + ) + for i in df_collection.items: + assert not i.has_mysql() + + @pytest.mark.skip + def test_update_partial_archive( + self, + df_collection, + offset, + duration, + thl_web_rw, + df_collection_data_type, + delete_df_collection, + ): + # for i in collection.items: + # assert i.update_partial_archive() + # assert df.created.max() < _last_time_block[1] + pass + + @pytest.mark.skip + def test_create_partial_archive( + self, + df_collection, + offset, + duration, + create_main_accounts, + thl_web_rw, + thl_lm, + df_collection_data_type, + user_factory, + product, + client_no_amm, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + assert 1 + 1 == 2 + + def test_dict( + self, + df_collection_data_type, + offset, + duration, + df_collection, + delete_df_collection, + ): + delete_df_collection(coll=df_collection) + + for item in df_collection.items: + res = item.to_dict() + assert isinstance(res, dict) + assert len(res.keys()) == 6 + + assert isinstance(res["should_archive"], bool) + assert isinstance(res["has_archive"], bool) + assert isinstance(res["path"], Path) + assert isinstance(res["filename"], str) + + assert isinstance(res["start"], datetime) + assert isinstance(res["finish"], datetime) + assert res["start"] < res["finish"] + + def test_from_mysql( + self, + df_collection_data_type, + df_collection, + offset, + duration, + create_main_accounts, + thl_web_rw, + user_factory, + product, + incite_item_factory, + delete_df_collection, + ): + from generalresearch.models.thl.user import User + + if df_collection.data_type in unsupported_mock_types: + return + + delete_df_collection(coll=df_collection) + u1: User = user_factory(product=product) + + # No data has been loaded, but we can confirm the from_mysql returns + # back an empty data with the correct columns + for item in df_collection.items: + # Unlike .from_mysql_ledger(), .from_mysql_standard() will return + # back and empty df with the correct columns in place + delete_df_collection(coll=df_collection) + df = item.from_mysql() + if df_collection.data_type == DFCollectionType.LEDGER: + assert df is None + else: + assert df.empty + assert set(df.columns) == set(df_collection._schema.columns.keys()) + + incite_item_factory(user=u1, item=item) + + df = item.from_mysql() + assert not df.empty + assert set(df.columns) == set(df_collection._schema.columns.keys()) + if df_collection.data_type == DFCollectionType.LEDGER: + # The number of rows in this dataframe will change depending + # on the mocking of data. It's because if the account has + # user wallet on, then there will be more transactions for + # example. + assert df.shape[0] > 0 + + def test_from_mysql_standard( + self, + df_collection_data_type, + df_collection, + offset, + duration, + user_factory, + product, + incite_item_factory, + delete_df_collection, + ): + from generalresearch.models.thl.user import User + + if df_collection.data_type in unsupported_mock_types: + return + u1: User = user_factory(product=product) + + delete_df_collection(coll=df_collection) + + for item in df_collection.items: + item: DFCollectionItem + + if df_collection.data_type == DFCollectionType.LEDGER: + # We're using parametrize, so this If statement is just to + # confirm other Item Types will always raise an assertion + with pytest.raises(expected_exception=AssertionError) as cm: + res = item.from_mysql_standard() + assert ( + "Can't call from_mysql_standard for Ledger DFCollectionItem" + in str(cm.value) + ) + + continue + + # Unlike .from_mysql_ledger(), .from_mysql_standard() will return + # back and empty df with the correct columns in place + df = item.from_mysql_standard() + assert df.empty + assert set(df.columns) == set(df_collection._schema.columns.keys()) + + incite_item_factory(user=u1, item=item) + + df = item.from_mysql_standard() + assert not df.empty + assert set(df.columns) == set(df_collection._schema.columns.keys()) + assert df.shape[0] > 0 + + def test_from_mysql_ledger( + self, + df_collection, + user, + create_main_accounts, + offset, + duration, + thl_web_rw, + thl_lm, + df_collection_data_type, + user_factory, + product, + client_no_amm, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + from generalresearch.models.thl.user import User + + if df_collection.data_type != DFCollectionType.LEDGER: + return + u1: User = user_factory(product=product) + + delete_df_collection(coll=df_collection) + + for item in df_collection.items: + item: DFCollectionItem + delete_df_collection(coll=df_collection) + + # Okay, now continue with the actual Ledger Item tests... we need + # to ensure that this item.start - item.finish range hasn't had + # any prior transactions created within that range. + assert item.from_mysql_ledger() is None + + # Create main accounts doesn't matter because it doesn't + # add any transactions to the db + assert item.from_mysql_ledger() is None + + incite_item_factory(user=u1, item=item) + df = item.from_mysql_ledger() + assert isinstance(df, pd.DataFrame) + + # Not only is this a np.int64 to int comparison, but I also know it + # isn't actually measuring anything meaningful. However, it's useful + # as it tells us if the DF contains all the correct TX Entries. I + # figured it's better to count the amount rather than just the + # number of rows. DF == transactions * 2 because there are two + # entries per transactions + # assert df.amount.sum() == total_amt + # assert total_entries == df.shape[0] + + assert not df.tx_id.is_unique + df["net"] = df.direction * df.amount + assert df.groupby("tx_id").net.sum().sum() == 0 + + def test_to_archive( + self, + df_collection, + user, + offset, + duration, + df_collection_data_type, + user_factory, + product, + client_no_amm, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + from generalresearch.models.thl.user import User + + if df_collection.data_type in unsupported_mock_types: + return + u1: User = user_factory(product=product) + + delete_df_collection(coll=df_collection) + + for item in df_collection.items: + item: DFCollectionItem + + incite_item_factory(user=u1, item=item) + + # Load up the data that we'll be using for various to_archive + # methods. + df = item.from_mysql() + ddf = dd.from_pandas(df, npartitions=1) + + # (1) Write the basic archive, the issue is that because it's + # an empty pd.DataFrame, it never makes an actual parquet file + assert item.to_archive(ddf=ddf, is_partial=False, overwrite=False) + assert item.has_archive() + assert item.has_archive(include_empty=False) + + def test__to_archive( + self, + df_collection_data_type, + df_collection, + user_factory, + product, + offset, + duration, + client_no_amm, + user, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + """We already have a test for the "non-private" version of this, + which primarily just uses the respective Client to determine if + the ddf is_empty or not. + + Therefore, use the private test to check the manual behavior of + passing in the is_empty or overwrite. + """ + if df_collection.data_type in unsupported_mock_types: + return + + delete_df_collection(coll=df_collection) + u1: User = user_factory(product=product) + + for item in df_collection.items: + item: DFCollectionItem + + incite_item_factory(user=u1, item=item) + + # Load up the data that we'll be using for various to_archive + # methods. Will always be empty pd.DataFrames for now... + df = item.from_mysql() + ddf = dd.from_pandas(df, npartitions=1) + + # (1) Confirm a missing ddf (shouldn't bc of type hint) should + # immediately return back False + assert not item._to_archive(ddf=None, is_empty=True) + assert not item._to_archive(ddf=None, is_empty=False) + + # (2) Setting empty overrides any possible state of the ddf + for rand_val in [df, ddf, True, 1_000]: + assert not item.empty_path.exists() + item._to_archive(ddf=rand_val, is_empty=True) + assert item.empty_path.exists() + item.empty_path.unlink() + + # (3) Trigger a warning with overwrite. First write an empty, + # then write it again with override default to confirm it worked, + # then write it again with override=False to confirm it does + # not work. + assert item._to_archive(ddf=ddf, is_empty=True) + res1 = item.empty_path.stat() + + # Returns none because it knows the file (regular, empty, or + # partial) already exists + assert not item._to_archive(ddf=ddf, is_empty=True, overwrite=False) + + # Currently override=True doesn't actually work on empty files + # because it's checked again in .set_empty() and isn't + # aware of the override flag that may be passed in to + # item._to_archive() + with pytest.raises(expected_exception=AssertionError) as cm: + item._to_archive(ddf=rand_val, is_empty=True, overwrite=True) + assert "set_empty is already set; why are you doing this?" in str(cm.value) + + # We can assert the file stats are the same because we were never + # able to go ahead and rewrite or update it in anyway + res2 = item.empty_path.stat() + assert res1 == res2 + + @pytest.mark.skip + def test_to_archive_numbered_partial( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + @pytest.mark.skip + def test_initial_load( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + @pytest.mark.skip + def test_clear_corrupt_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset, duration", + argvalues=list(iter_product(df_collections, ["12h", "10D"], [timedelta(days=15)])), +) +class TestDFCollectionItemMethodBase: + + @pytest.mark.skip + def test_path_exists(self, df_collection_data_type, offset, duration): + pass + + @pytest.mark.skip + def test_next_numbered_path(self, df_collection_data_type, offset, duration): + pass + + @pytest.mark.skip + def test_search_highest_numbered_path( + self, df_collection_data_type, offset, duration + ): + pass + + @pytest.mark.skip + def test_tmp_filename(self, df_collection_data_type, offset, duration): + pass + + @pytest.mark.skip + def test_tmp_path(self, df_collection_data_type, offset, duration): + pass + + def test_is_empty(self, df_collection_data_type, df_collection, offset, duration): + """ + test_has_empty was merged into this because item.has_empty is + an alias for is_empty.. or vis-versa + """ + + for item in df_collection.items: + assert not item.is_empty() + assert not item.has_empty() + + item.empty_path.touch() + + assert item.is_empty() + assert item.has_empty() + + def test_has_partial_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + for item in df_collection.items: + assert not item.has_partial_archive() + item.partial_path.touch() + assert item.has_partial_archive() + + def test_has_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + for item in df_collection.items: + # (1) Originally, nothing exists... so let's just make a file and + # confirm that it works if just touch that path (no validation + # occurs at all). + assert not item.has_archive(include_empty=False) + assert not item.has_archive(include_empty=True) + item.path.touch() + assert item.has_archive(include_empty=False) + assert item.has_archive(include_empty=True) + + item.path.unlink() + assert not item.has_archive(include_empty=False) + assert not item.has_archive(include_empty=True) + + # (2) Same as the above, except make an empty directory + # instead of a file + assert not item.has_archive(include_empty=False) + assert not item.has_archive(include_empty=True) + item.path.mkdir() + assert item.has_archive(include_empty=False) + assert item.has_archive(include_empty=True) + + item.path.rmdir() + assert not item.has_archive(include_empty=False) + assert not item.has_archive(include_empty=True) + + # (3) Rather than make a empty file or dir at the path, let's + # touch the empty_path and confirm the include_empty option + # works + + item.empty_path.touch() + assert not item.has_archive(include_empty=False) + assert item.has_archive(include_empty=True) + + def test_delete_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + for item in df_collection.items: + item: DFCollectionItem + # (1) Confirm that it doesn't raise an error or anything if we + # try to delete files or folders that do not exist + CollectionItemBase.delete_archive(generic_path=item.path) + CollectionItemBase.delete_archive(generic_path=item.empty_path) + CollectionItemBase.delete_archive(generic_path=item.partial_path) + + item.path.touch() + item.empty_path.touch() + item.partial_path.touch() + + CollectionItemBase.delete_archive(generic_path=item.path) + CollectionItemBase.delete_archive(generic_path=item.empty_path) + CollectionItemBase.delete_archive(generic_path=item.partial_path) + + assert not item.path.exists() + assert not item.empty_path.exists() + assert not item.partial_path.exists() + + def test_should_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + schema: DataFrameSchema = df_collection._schema + aa = schema.metadata[ARCHIVE_AFTER] + + # It shouldn't be None, it can be timedelta(seconds=0) + assert isinstance(aa, timedelta) + + for item in df_collection.items: + item: DFCollectionItem + + if datetime.now(tz=timezone.utc) > item.finish + aa: + assert item.should_archive() + else: + assert not item.should_archive() + + @pytest.mark.skip + def test_set_empty(self, df_collection_data_type, df_collection, offset, duration): + pass + + def test_valid_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + # Originally, nothing has been saved or anything.. so confirm it + # always comes back as None + for item in df_collection.items: + assert not item.valid_archive(generic_path=None, sample=None) + + _path = Path(pjoin(df_collection.archive_path, uuid4().hex)) + + # (1) Fail if isfile, but doesn't exist and if we can't read + # it as valid ParquetFile + assert not item.valid_archive(generic_path=_path, sample=None) + _path.touch() + assert not item.valid_archive(generic_path=_path, sample=None) + _path.unlink() + + # (2) Fail if isdir and we can't read it as a valid ParquetFile + _path.mkdir() + assert _path.is_dir() + assert not item.valid_archive(generic_path=_path, sample=None) + _path.rmdir() + + @pytest.mark.skip + def test_validate_df( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + @pytest.mark.skip + def test_from_archive( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + def test__to_dict(self, df_collection_data_type, df_collection, offset, duration): + + for item in df_collection.items: + res = item._to_dict() + assert isinstance(res, dict) + assert len(res.keys()) == 6 + + assert isinstance(res["should_archive"], bool) + assert isinstance(res["has_archive"], bool) + assert isinstance(res["path"], Path) + assert isinstance(res["filename"], str) + + assert isinstance(res["start"], datetime) + assert isinstance(res["finish"], datetime) + assert res["start"] < res["finish"] + + @pytest.mark.skip + def test_delete_partial( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + @pytest.mark.skip + def test_cleanup_partials( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + @pytest.mark.skip + def test_delete_dangling_partials( + self, df_collection_data_type, df_collection, offset, duration + ): + pass + + +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) +async def test_client(client, s, worker): + """c,s,a are all required - the secondary Worker (b) is not required""" + + assert isinstance(client, Client) + assert isinstance(s, Scheduler) + assert isinstance(worker, Worker) + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", + argvalues=combo_object(), +) +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) +@pytest.mark.anyio +async def test_client_parametrize(c, s, w, df_collection_data_type, offset): + """c,s,a are all required - the secondary Worker (b) is not required""" + + assert isinstance(c, Client), f"c is not Client, it's {type(c)}" + assert isinstance(s, Scheduler), f"s is not Scheduler, it's {type(s)}" + assert isinstance(w, Worker), f"w is not Worker, it's {type(w)}" + + assert df_collection_data_type is not None + assert isinstance(offset, str) + + +# I cannot figure out how to define the parametrize on the Test, but then have +# sync or async methods within it, with some having or not having the +# gen_cluster decorator set. + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset, duration", + argvalues=list(iter_product(df_collections, ["12h", "10D"], [timedelta(days=15)])), +) +class TestDFCollectionItemFunctionalTest: + + def test_to_archive_and_ddf( + self, + df_collection_data_type, + offset, + duration, + client_no_amm, + df_collection, + user, + user_factory, + product, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + from generalresearch.models.thl.user import User + + if df_collection.data_type in unsupported_mock_types: + return + u1: User = user_factory(product=product) + + delete_df_collection(coll=df_collection) + df_collection._client = client_no_amm + + # Assert that there are no pre-existing archives + assert df_collection.progress.has_archive.eq(False).all() + res = df_collection.ddf() + assert res is None + + delete_df_collection(coll=df_collection) + for item in df_collection.items: + item: DFCollectionItem + + incite_item_factory(user=u1, item=item) + item.initial_load() + + # I know it seems weird to delete items from the database before we + # proceed with the test. However, the content should have already + # been saved out into an parquet at this point, and I am too lazy + # to write a separate teardown for a collection (and not a + # single Item) + + # Now that we went ahead with the initial_load, Assert that all + # items have archives files saved + assert isinstance(df_collection.progress, pd.DataFrame) + assert df_collection.progress.has_archive.eq(True).all() + + ddf = df_collection.ddf() + shape = df_collection._client.compute(collections=ddf.shape, sync=True) + assert shape[0] > 5 + + def test_filesize_estimate( + self, + df_collection, + user, + offset, + duration, + client_no_amm, + user_factory, + product, + df_collection_data_type, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + """A functional test to write some Parquet files for the + DFCollection and then confirm that the files get written + correctly. + + Confirm the files are written correctly by: + (1) Validating their passing the pandera schema + (2) The file or dir has an expected size on disk + """ + import pyarrow.parquet as pq + from generalresearch.models.thl.user import User + import os + + if df_collection.data_type in unsupported_mock_types: + return + delete_df_collection(coll=df_collection) + u1: User = user_factory(product=product) + + # Pick 3 random items to sample for correct filesize + for item in df_collection.items: + item: DFCollectionItem + + incite_item_factory(user=u1, item=item) + item.initial_load(overwrite=True) + + total_bytes = 0 + for fp in pq.ParquetDataset(item.path).files: + total_bytes += os.stat(fp).st_size + + total_mb = total_bytes / 1_048_576 + + assert total_bytes > 1_000 + assert total_mb < 1 + + def test_to_archive_client( + self, + client_no_amm, + df_collection, + user_factory, + product, + offset, + duration, + df_collection_data_type, + incite_item_factory, + delete_df_collection, + mnt_filepath, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=df_collection) + df_collection._client = client_no_amm + u1: User = user_factory(product=product) + + for item in df_collection.items: + item: DFCollectionItem + + if df_collection.data_type in unsupported_mock_types: + continue + + incite_item_factory(user=u1, item=item) + + # Load up the data that we'll be using for various to_archive + # methods. Will always be empty pd.DataFrames for now... + df = item.from_mysql() + ddf = dd.from_pandas(df, npartitions=1) + assert isinstance(ddf, dd.DataFrame) + + # (1) Write the basic archive, the issue is that because it's + # an empty pd.DataFrame, it never makes an actual parquet file + assert not item.has_archive() + saved = item.to_archive(ddf=ddf, is_partial=False, overwrite=False) + assert saved + assert item.has_archive(include_empty=True) + + @pytest.mark.skip + def test_get_items(self, df_collection, product, offset, duration): + with pytest.warns(expected_warning=ResourceWarning) as cm: + df_collection.get_items_last365() + assert "DFCollectionItem has missing archives" in str( + [w.message for w in cm.list] + ) + + res = df_collection.get_items_last365() + assert len(res) == len(df_collection.items) + + def test_saving_protections( + self, + client_no_amm, + df_collection_data_type, + df_collection, + incite_item_factory, + delete_df_collection, + user_factory, + product, + offset, + duration, + mnt_filepath, + ): + """Don't allow creating an archive for data that will likely be + overwritten or updated + """ + from generalresearch.models.thl.user import User + + if df_collection.data_type in unsupported_mock_types: + return + u1: User = user_factory(product=product) + + schema: DataFrameSchema = df_collection._schema + aa = schema.metadata[ARCHIVE_AFTER] + assert isinstance(aa, timedelta) + + delete_df_collection(df_collection) + for item in df_collection.items: + item: DFCollectionItem + + incite_item_factory(user=u1, item=item) + + should_archive = item.should_archive() + res = item.initial_load() + + # self.assertIn("Cannot create archive for such new data", str(cm.records)) + + # .to_archive() will return back True or False depending on if it + # was successful. We want to compare that result to the + # .should_archive() method result + assert should_archive == res + + def test_empty_item( + self, + client_no_amm, + df_collection_data_type, + df_collection, + incite_item_factory, + delete_df_collection, + user, + offset, + duration, + mnt_filepath, + ): + delete_df_collection(coll=df_collection) + + for item in df_collection.items: + assert not item.has_empty() + df: pd.DataFrame = item.from_mysql() + + # We do this check b/c the Ledger returns back None and + # I don't want it to fail when we go to make a ddf + if df is None: + item.set_empty() + else: + ddf = dd.from_pandas(df, npartitions=1) + item.to_archive(ddf=ddf) + + assert item.has_empty() + + def test_file_touching( + self, + client_no_amm, + df_collection_data_type, + df_collection, + incite_item_factory, + delete_df_collection, + user_factory, + product, + offset, + duration, + mnt_filepath, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=df_collection) + df_collection._client = client_no_amm + u1: User = user_factory(product=product) + + for item in df_collection.items: + # Confirm none of the paths exist yet + assert not item.has_archive() + assert not item.path_exists(generic_path=item.path) + assert not item.has_empty() + assert not item.path_exists(generic_path=item.empty_path) + + if df_collection.data_type in unsupported_mock_types: + assert not item.has_archive(include_empty=False) + assert not item.has_empty() + assert not item.path_exists(generic_path=item.empty_path) + else: + incite_item_factory(user=u1, item=item) + item.initial_load() + + assert item.has_archive(include_empty=False) + assert item.path_exists(generic_path=item.path) + assert not item.has_empty() diff --git a/tests/incite/collections/test_df_collection_thl_marketplaces.py b/tests/incite/collections/test_df_collection_thl_marketplaces.py new file mode 100644 index 0000000..0a77938 --- /dev/null +++ b/tests/incite/collections/test_df_collection_thl_marketplaces.py @@ -0,0 +1,75 @@ +from datetime import datetime, timezone +from itertools import product + +import pytest +from pandera import Column, Index, DataFrameSchema + +from generalresearch.incite.collections import DFCollection +from generalresearch.incite.collections import DFCollectionType +from generalresearch.incite.collections.thl_marketplaces import ( + InnovateSurveyHistoryCollection, + MorningSurveyTimeseriesCollection, + SagoSurveyHistoryCollection, + SpectrumSurveyTimeseriesCollection, +) +from test_utils.incite.conftest import mnt_filepath + + +def combo_object(): + for x in product( + [ + InnovateSurveyHistoryCollection, + MorningSurveyTimeseriesCollection, + SagoSurveyHistoryCollection, + SpectrumSurveyTimeseriesCollection, + ], + ["5min", "6H", "30D"], + ): + yield x + + +@pytest.mark.parametrize("df_coll, offset", combo_object()) +class TestDFCollection_thl_marketplaces: + + def test_init(self, mnt_filepath, df_coll, offset, spectrum_rw): + assert issubclass(df_coll, DFCollection) + + # This is stupid, but we need to pull the default from the + # Pydantic field + data_type = df_coll.model_fields["data_type"].default + assert isinstance(data_type, DFCollectionType) + + # (1) Can't be totally empty, needs a path... + with pytest.raises(expected_exception=Exception) as cm: + instance = df_coll() + + # (2) Confirm it only needs the archive_path + instance = df_coll( + archive_path=mnt_filepath.archive_path(enum_type=data_type), + ) + assert isinstance(instance, DFCollection) + + # (3) Confirm it loads with all + instance = df_coll( + archive_path=mnt_filepath.archive_path(enum_type=data_type), + sql_helper=spectrum_rw, + offset=offset, + start=datetime(year=2023, month=6, day=1, minute=0, tzinfo=timezone.utc), + finished=datetime(year=2023, month=6, day=1, minute=5, tzinfo=timezone.utc), + ) + assert isinstance(instance, DFCollection) + + # (4) Now that we initialize the Class, we can access the _schema + assert isinstance(instance._schema, DataFrameSchema) + assert isinstance(instance._schema.index, Index) + + for c in instance._schema.columns.keys(): + assert isinstance(c, str) + col = instance._schema.columns[c] + assert isinstance(col, Column) + + assert instance._schema.coerce, "coerce on all Schemas" + assert isinstance(instance._schema.checks, list) + assert len(instance._schema.checks) == 0 + assert isinstance(instance._schema.metadata, dict) + assert len(instance._schema.metadata.keys()) == 2 diff --git a/tests/incite/collections/test_df_collection_thl_web.py b/tests/incite/collections/test_df_collection_thl_web.py new file mode 100644 index 0000000..e6f464b --- /dev/null +++ b/tests/incite/collections/test_df_collection_thl_web.py @@ -0,0 +1,160 @@ +from datetime import datetime +from itertools import product + +import dask.dataframe as dd +import pandas as pd +import pytest +from pandera import DataFrameSchema + +from generalresearch.incite.collections import DFCollection, DFCollectionType + + +def combo_object(): + for x in product( + [ + DFCollectionType.USER, + DFCollectionType.WALL, + DFCollectionType.SESSION, + DFCollectionType.TASK_ADJUSTMENT, + DFCollectionType.AUDIT_LOG, + DFCollectionType.LEDGER, + ], + ["30min", "1H"], + ): + yield x + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollection_thl_web: + + def test_init(self, df_collection_data_type, offset, df_collection): + assert isinstance(df_collection_data_type, DFCollectionType) + assert isinstance(df_collection, DFCollection) + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollection_thl_web_Properties: + + def test_items(self, df_collection_data_type, offset, df_collection): + assert isinstance(df_collection.items, list) + for i in df_collection.items: + assert i._collection == df_collection + + def test__schema(self, df_collection_data_type, offset, df_collection): + assert isinstance(df_collection._schema, DataFrameSchema) + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollection_thl_web_BaseProperties: + + @pytest.mark.skip + def test__interval_range(self, df_collection_data_type, offset, df_collection): + pass + + def test_interval_start(self, df_collection_data_type, offset, df_collection): + assert isinstance(df_collection.interval_start, datetime) + + def test_interval_range(self, df_collection_data_type, offset, df_collection): + assert isinstance(df_collection.interval_range, list) + + def test_progress(self, df_collection_data_type, offset, df_collection): + assert isinstance(df_collection.progress, pd.DataFrame) + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollection_thl_web_Methods: + + @pytest.mark.skip + def test_initial_loads(self, df_collection_data_type, df_collection, offset): + pass + + @pytest.mark.skip + def test_fetch_force_rr_latest( + self, df_collection_data_type, df_collection, offset + ): + pass + + @pytest.mark.skip + def test_force_rr_latest(self, df_collection_data_type, df_collection, offset): + pass + + +@pytest.mark.parametrize( + argnames="df_collection_data_type, offset", argvalues=combo_object() +) +class TestDFCollection_thl_web_BaseMethods: + + def test_fetch_all_paths(self, df_collection_data_type, offset, df_collection): + res = df_collection.fetch_all_paths( + items=None, force_rr_latest=False, include_partial=False + ) + assert isinstance(res, list) + + @pytest.mark.skip + def test_ddf(self, df_collection_data_type, offset, df_collection): + res = df_collection.ddf() + assert isinstance(res, dd.DataFrame) + + # -- cleanup -- + @pytest.mark.skip + def test_schedule_cleanup(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_cleanup(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_cleanup_partials(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_clear_tmp_archives(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_clear_corrupt_archives( + self, df_collection_data_type, offset, df_collection + ): + pass + + @pytest.mark.skip + def test_rebuild_symlinks(self, df_collection_data_type, offset, df_collection): + pass + + # -- Source timing -- + @pytest.mark.skip + def test_get_item(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_get_item_start(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_get_items(self, df_collection_data_type, offset, df_collection): + # If we get all the items from the start of the collection, it + # should include all the items! + res1 = df_collection.items + res2 = df_collection.get_items(since=df_collection.start) + assert len(res1) == len(res2) + + @pytest.mark.skip + def test_get_items_from_year(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_get_items_last90(self, df_collection_data_type, offset, df_collection): + pass + + @pytest.mark.skip + def test_get_items_last365(self, df_collection_data_type, offset, df_collection): + pass diff --git a/tests/incite/collections/test_df_collection_thl_web_ledger.py b/tests/incite/collections/test_df_collection_thl_web_ledger.py new file mode 100644 index 0000000..599d979 --- /dev/null +++ b/tests/incite/collections/test_df_collection_thl_web_ledger.py @@ -0,0 +1,32 @@ +# def test_loaded(self, client_no_amm, collection, new_user_fixture, pop_ledger_merge): +# collection._client = client_no_amm +# +# teardown_events(collection) +# THL_LM.create_main_accounts() +# +# for item in collection.items: +# populate_events(item, user=new_user_fixture) +# item.initial_load() +# +# ddf = collection.ddf( +# force_rr_latest=False, +# include_partial=True, +# filters=[ +# ("created", ">=", collection.start), +# ("created", "<", collection.finished), +# ], +# ) +# +# assert isinstance(ddf, dd.DataFrame) +# df = client_no_amm.compute(collections=ddf, sync=True) +# assert isinstance(df, pd.DataFrame) +# +# # Simple validation check(s) +# assert not df.tx_id.is_unique +# df["net"] = df.direction * df.amount +# assert df.groupby("tx_id").net.sum().sum() == 0 +# +# teardown_events(collection) +# +# +# diff --git a/tests/incite/mergers/__init__.py b/tests/incite/mergers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/mergers/__init__.py diff --git a/tests/incite/mergers/foundations/__init__.py b/tests/incite/mergers/foundations/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/mergers/foundations/__init__.py diff --git a/tests/incite/mergers/foundations/test_enriched_session.py b/tests/incite/mergers/foundations/test_enriched_session.py new file mode 100644 index 0000000..ec15d38 --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_session.py @@ -0,0 +1,138 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product +from typing import Optional + +from generalresearch.incite.schemas.admin_responses import ( + AdminPOPSessionSchema, +) + +import dask.dataframe as dd +import pandas as pd +import pytest + +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=5)], + ) + ), +) +class TestEnrichedSession: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_session_merge, + thl_web_rr, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=session_collection) + + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_session_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_session_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + # -- Teardown + delete_df_collection(session_collection) + + +class TestEnrichedSessionAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_to_admin_response( + self, + event_report_request, + enriched_session_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + session_report_request, + user_factory, + start, + session_factory, + product_factory, + delete_df_collection, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=1, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_session_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + + df = enriched_session_merge.to_admin_response( + rr=session_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + assert isinstance(AdminPOPSessionSchema.validate(df), pd.DataFrame) + assert df.index.get_level_values(1).nunique() == 2 diff --git a/tests/incite/mergers/foundations/test_enriched_task_adjust.py b/tests/incite/mergers/foundations/test_enriched_task_adjust.py new file mode 100644 index 0000000..96c214f --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_task_adjust.py @@ -0,0 +1,76 @@ +from datetime import timedelta +from itertools import product as iter_product + +import dask.dataframe as dd +import pandas as pd +import pytest + +from test_utils.incite.collections.conftest import ( + wall_collection, + task_adj_collection, + session_collection, +) +from test_utils.incite.mergers.conftest import enriched_wall_merge + + +@pytest.mark.parametrize( + argnames="offset, duration,", + argvalues=list( + iter_product( + ["12h", "3D"], + [timedelta(days=5)], + ) + ), +) +class TestEnrichedTaskAdjust: + + @pytest.mark.skip + def test_base( + self, + client_no_amm, + user_factory, + product, + task_adj_collection, + wall_collection, + session_collection, + enriched_wall_merge, + enriched_task_adjust_merge, + incite_item_factory, + delete_df_collection, + thl_web_rr, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + u1: User = user_factory(product=product) + + for item in session_collection.items: + incite_item_factory(user=u1, item=item) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + + enriched_task_adjust_merge.build( + client=client_no_amm, + task_adjust_coll=task_adj_collection, + enriched_wall=enriched_wall_merge, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_task_adjust_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty diff --git a/tests/incite/mergers/foundations/test_enriched_wall.py b/tests/incite/mergers/foundations/test_enriched_wall.py new file mode 100644 index 0000000..8f4995b --- /dev/null +++ b/tests/incite/mergers/foundations/test_enriched_wall.py @@ -0,0 +1,236 @@ +from datetime import timedelta, timezone, datetime +from decimal import Decimal +from itertools import product as iter_product +from typing import Optional + +import dask.dataframe as dd +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMergeItem, +) +from test_utils.incite.collections.conftest import ( + session_collection, + wall_collection, +) +from test_utils.incite.conftest import incite_item_factory +from test_utils.incite.mergers.conftest import ( + enriched_wall_merge, +) + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list(iter_product(["48h", "3D"], [timedelta(days=5)])), +) +class TestEnrichedWall: + + def test_base( + self, + client_no_amm, + product, + user_factory, + wall_collection, + thl_web_rr, + session_collection, + enriched_wall_merge, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + # -- Build & Setup + delete_df_collection(coll=session_collection) + delete_df_collection(coll=wall_collection) + u1: User = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u1) + item.initial_load() + + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + ddf = enriched_wall_merge.ddf() + assert isinstance(ddf, dd.DataFrame) + + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + + assert not df.empty + + def test_base_item( + self, + client_no_amm, + product, + user_factory, + wall_collection, + session_collection, + enriched_wall_merge, + delete_df_collection, + thl_web_rr, + incite_item_factory, + ): + # -- Build & Setup + delete_df_collection(coll=session_collection) + u = user_factory(product=product, created=session_collection.start) + + for item in session_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + # -- + + for item in enriched_wall_merge.items: + assert isinstance(item, EnrichedWallMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + item.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + modified_time2 = path.stat().st_mtime + + # Merger Items can't be updated unless it's a partial, confirm + # that even after attempting to rebuild, it doesn't re-touch + # the file + assert modified_time2 == modified_time1 + + # def test_admin_pop_session_device_type(ew_merge_setup): + # self.build() + # + # rr = ReportRequest( + # report_type=ReportType.POP_EVENT, + # index0="started", + # index1="device_type", + # freq="min", + # start=start, + # ) + # + # df, categories, updated = self.instance.to_admin_response( + # rr=rr, product_ids=[self.product.id], client=client + # ) + # + # assert isinstance(df, pd.DataFrame) + # device_types_str = [str(e.value) for e in DeviceType] + # device_types = df.index.get_level_values(1).values + # assert all([dt in device_types_str for dt in device_types]) + + +class TestEnrichedWallToAdmin: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def offset(self) -> str: + return "1d" + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_empty(self, enriched_wall_merge, client_no_amm, start): + from generalresearch.models.admin.request import ReportRequest + + rr = ReportRequest.model_validate({"interval": "5min", "start": start}) + + res = enriched_wall_merge.to_admin_response( + rr=rr, + client=client_no_amm, + ) + + assert isinstance(res, pd.DataFrame) + + assert res.empty + assert len(res.columns) > 5 + + def test_to_admin_response( + self, + event_report_request, + enriched_wall_merge, + client_no_amm, + wall_collection, + session_collection, + thl_web_rr, + user, + session_factory, + delete_df_collection, + product_factory, + user_factory, + start, + ): + delete_df_collection(coll=wall_collection) + delete_df_collection(coll=session_collection) + + p1 = product_factory() + p2 = product_factory() + + for p in [p1, p2]: + u = user_factory(product=p) + for i in range(50): + s = session_factory( + user=u, + wall_count=2, + wall_req_cpi=Decimal("1.00"), + started=start + timedelta(minutes=i, seconds=1), + ) + + wall_collection.initial_load(client=None, sync=True) + session_collection.initial_load(client=None, sync=True) + + enriched_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + session_coll=session_collection, + pg_config=thl_web_rr, + ) + + df = enriched_wall_merge.to_admin_response( + rr=event_report_request, client=client_no_amm + ) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + # assert len(df) == 1 + # assert user.product_id == df.reset_index().loc[0, "index1"] + assert df.index.get_level_values(1).nunique() == 2 diff --git a/tests/incite/mergers/foundations/test_user_id_product.py b/tests/incite/mergers/foundations/test_user_id_product.py new file mode 100644 index 0000000..f96bfb4 --- /dev/null +++ b/tests/incite/mergers/foundations/test_user_id_product.py @@ -0,0 +1,73 @@ +from datetime import timedelta, datetime, timezone +from itertools import product + +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from generalresearch.incite.mergers.foundations.user_id_product import ( + UserIdProductMergeItem, +) +from test_utils.incite.mergers.conftest import user_id_product_merge + + +@pytest.mark.parametrize( + argnames="offset, duration, start", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=5)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestUserIDProduct: + + @pytest.mark.skip + def test_base(self, client_no_amm, user_id_product_merge): + ddf = user_id_product_merge.ddf() + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + assert not df.empty + + @pytest.mark.skip + def test_base_item(self, client_no_amm, user_id_product_merge, user_collection): + assert len(user_id_product_merge.items) == 1 + + for item in user_id_product_merge.items: + assert isinstance(item, UserIdProductMergeItem) + + path = item.path + + try: + modified_time1 = path.stat().st_mtime + except (Exception,): + modified_time1 = 0 + + user_id_product_merge.build(client=client_no_amm, user_coll=user_collection) + modified_time2 = path.stat().st_mtime + + assert modified_time2 > modified_time1 + + @pytest.mark.skip + def test_read(self, client_no_amm, user_id_product_merge): + users_ddf = user_id_product_merge.ddf() + df = client_no_amm.compute(collections=users_ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert len(df.columns) == 1 + assert str(df.product_id.dtype) == "category" diff --git a/tests/incite/mergers/test_merge_collection.py b/tests/incite/mergers/test_merge_collection.py new file mode 100644 index 0000000..692cac3 --- /dev/null +++ b/tests/incite/mergers/test_merge_collection.py @@ -0,0 +1,102 @@ +from datetime import datetime, timezone, timedelta +from itertools import product + +import pandas as pd +import pytest +from pandera import DataFrameSchema + +from generalresearch.incite.mergers import ( + MergeCollection, + MergeType, +) +from test_utils.incite.conftest import mnt_filepath + +merge_types = list(e for e in MergeType if e != MergeType.TEST) + + +@pytest.mark.parametrize( + argnames="merge_type, offset, duration, start", + argvalues=list( + product( + merge_types, + ["5min", "6h", "14D"], + [timedelta(days=30)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestMergeCollection: + + def test_init(self, mnt_filepath, merge_type, offset, duration, start): + with pytest.raises(expected_exception=ValueError) as cm: + MergeCollection(archive_path=mnt_filepath.data_src) + assert "Must explicitly provide a merge_type" in str(cm.value) + + instance = MergeCollection( + merge_type=merge_type, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + assert instance.merge_type == merge_type + + def test_items(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + offset=offset, + start=start, + finished=start + duration, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + assert len(instance.interval_range) == len(instance.items) + + def test_progress(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + offset=offset, + start=start, + finished=start + duration, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + assert isinstance(instance.progress, pd.DataFrame) + assert instance.progress.shape[0] > 0 + assert instance.progress.shape[1] == 7 + assert instance.progress["group_by"].isnull().all() + + def test_schema(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + assert isinstance(instance._schema, DataFrameSchema) + + def test_load(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + merge_type=merge_type, + start=start, + finished=start + duration, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + # Confirm that there are no archives available yet + assert instance.progress.has_archive.eq(False).all() + + def test_get_items(self, mnt_filepath, merge_type, offset, duration, start): + instance = MergeCollection( + start=start, + finished=start + duration, + offset=offset, + merge_type=merge_type, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) + + # with pytest.raises(expected_exception=ResourceWarning) as cm: + res = instance.get_items_last365() + # assert "has missing archives", str(cm.value) + assert len(res) == len(instance.items) diff --git a/tests/incite/mergers/test_merge_collection_item.py b/tests/incite/mergers/test_merge_collection_item.py new file mode 100644 index 0000000..96f8789 --- /dev/null +++ b/tests/incite/mergers/test_merge_collection_item.py @@ -0,0 +1,66 @@ +from datetime import datetime, timezone, timedelta +from itertools import product +from pathlib import PurePath + +import pytest + +from generalresearch.incite.mergers import MergeCollectionItem, MergeType +from generalresearch.incite.mergers.foundations.enriched_session import ( + EnrichedSessionMerge, +) +from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMerge, +) +from test_utils.incite.mergers.conftest import merge_collection + + +@pytest.mark.parametrize( + argnames="merge_type, offset, duration", + argvalues=list( + product( + [MergeType.ENRICHED_SESSION, MergeType.ENRICHED_WALL], + ["1h"], + [timedelta(days=1)], + ) + ), +) +class TestMergeCollectionItem: + + def test_file_naming(self, merge_collection, offset, duration, start): + assert len(merge_collection.items) == 25 + + items: list[MergeCollectionItem] = merge_collection.items + + for i in items: + i: MergeCollectionItem + + assert isinstance(i.path, PurePath) + assert i.path.name == i.filename + + assert i._collection.merge_type.name.lower() in i.filename + assert i._collection.offset in i.filename + assert i.start.strftime("%Y-%m-%d-%H-%M-%S") in i.filename + + def test_archives(self, merge_collection, offset, duration, start): + assert len(merge_collection.items) == 25 + + for i in merge_collection.items: + assert not i.has_archive() + assert not i.has_empty() + assert not i.is_empty() + assert not i.has_partial_archive() + assert i.has_archive() == i.path_exists(generic_path=i.path) + + res = set([i.should_archive() for i in merge_collection.items]) + assert len(res) == 1 + + def test_item_to_archive(self, merge_collection, offset, duration, start): + for item in merge_collection.items: + item: MergeCollectionItem + assert not item.has_archive() + + # TODO: setup build methods + # ddf = self.build + # saved = instance.to_archive(ddf=ddf) + # self.assertTrue(saved) + # self.assertTrue(instance.has_archive()) diff --git a/tests/incite/mergers/test_pop_ledger.py b/tests/incite/mergers/test_pop_ledger.py new file mode 100644 index 0000000..6f96108 --- /dev/null +++ b/tests/incite/mergers/test_pop_ledger.py @@ -0,0 +1,307 @@ +from datetime import timedelta, datetime, timezone +from itertools import product as iter_product +from typing import Optional + +import pandas as pd +import pytest +from distributed.utils_test import client_no_amm + +from generalresearch.incite.schemas.mergers.pop_ledger import ( + numerical_col_names, +) +from test_utils.incite.collections.conftest import ledger_collection +from test_utils.incite.conftest import mnt_filepath, incite_item_factory +from test_utils.incite.mergers.conftest import pop_ledger_merge +from test_utils.managers.ledger.conftest import create_main_accounts + + +@pytest.mark.parametrize( + argnames="offset, duration", + argvalues=list( + iter_product( + ["12h", "3D"], + [timedelta(days=4)], + ) + ), +) +class TestMergePOPLedger: + + @pytest.fixture + def start(self) -> "datetime": + return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc) + + @pytest.fixture + def duration(self) -> Optional["timedelta"]: + return timedelta(days=5) + + def test_base( + self, + client_no_amm, + ledger_collection, + pop_ledger_merge, + product, + user_factory, + create_main_accounts, + thl_lm, + delete_df_collection, + incite_item_factory, + delete_ledger_db, + ): + from generalresearch.models.thl.ledger import LedgerAccount + + u = user_factory(product=product, created=ledger_collection.start) + + # -- Build & Setup + delete_ledger_db() + create_main_accounts() + delete_df_collection(coll=ledger_collection) + # assert ledger_collection.start is None + # assert ledger_collection.offset is None + + for item in ledger_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load() + + # Confirm any of the items are archived + assert ledger_collection.progress.has_archive.eq(True).all() + + pop_ledger_merge.build( + client=client_no_amm, + ledger_coll=ledger_collection, + ) + # assert pop_ledger_merge.progress.has_archive.eq(True).all() + + ddf = pop_ledger_merge.ddf() + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + + # -- + + user_wallet_account: LedgerAccount = thl_lm.get_account_or_create_user_wallet( + user=u + ) + cash_account: LedgerAccount = thl_lm.get_account_cash() + rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue() + + item_finishes = [i.finish for i in ledger_collection.items] + item_finishes.sort(reverse=True) + last_item_finish = item_finishes[0] + + # Pure SQL based lookups + cash_balance: int = thl_lm.get_account_balance(account=cash_account) + rev_balance: int = thl_lm.get_account_balance(account=rev_account) + assert cash_balance > rev_balance + + # (1) Test Cash Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names, + filters=[ + ("account_id", "==", cash_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + assert df["mp_payment.CREDIT"].sum() == 0 + assert cash_balance > 0 + assert df["mp_payment.DEBIT"].sum() == cash_balance + + # (2) Test Revenue Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names, + filters=[ + ("account_id", "==", rev_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert rev_balance == 0 + assert df["bp_payment.CREDIT"].sum() == 0 + assert df["mp_payment.DEBIT"].sum() == 0 + assert df["mp_payment.CREDIT"].sum() > 0 + + # -- Cleanup + delete_ledger_db() + + def test_pydantic_init( + self, + client_no_amm, + ledger_collection, + pop_ledger_merge, + mnt_filepath, + product, + user_factory, + create_main_accounts, + offset, + duration, + start, + thl_lm, + incite_item_factory, + delete_df_collection, + delete_ledger_db, + session_collection, + ): + from generalresearch.models.thl.ledger import LedgerAccount + from generalresearch.models.thl.product import Product + from generalresearch.models.thl.finance import ProductBalances + + u = user_factory(product=product, created=session_collection.start) + + assert ledger_collection.finished is not None + assert isinstance(u.product, Product) + delete_ledger_db() + create_main_accounts(), + delete_df_collection(coll=ledger_collection) + + bp_account: LedgerAccount = thl_lm.get_account_or_create_bp_wallet( + product=u.product + ) + cash_account: LedgerAccount = thl_lm.get_account_cash() + rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue() + + for item in ledger_collection.items: + incite_item_factory(item=item, user=u) + item.initial_load(overwrite=True) + + pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection) + + item_finishes = [i.finish for i in ledger_collection.items] + item_finishes.sort(reverse=True) + last_item_finish = item_finishes[0] + + # (1) Filter by the Product Account, this means no cash_account, or + # rev_account transactions will be present in here... + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", bp_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + df = df.set_index("time_idx") + assert not df.empty + + instance = ProductBalances.from_pandas(input_data=df.sum()) + assert instance.payout == instance.net == instance.bp_payment_credit + assert instance.available_balance < instance.net + assert instance.available_balance + instance.retainer == instance.net + assert instance.balance == thl_lm.get_account_balance(bp_account) + assert df["bp_payment.CREDIT"].sum() == thl_lm.get_account_balance(bp_account) + + # (2) Filter by the Cash Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", cash_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + cash_balance: int = thl_lm.get_account_balance(account=cash_account) + assert df["bp_payment.CREDIT"].sum() == 0 + assert cash_balance > 0 + assert df["mp_payment.CREDIT"].sum() == 0 + assert df["mp_payment.DEBIT"].sum() == cash_balance + + # (2) Filter by the Revenue Account + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", rev_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + rev_balance: int = thl_lm.get_account_balance(account=rev_account) + assert rev_balance == 0 + assert df["bp_payment.CREDIT"].sum() == 0 + assert df["mp_payment.DEBIT"].sum() == 0 + assert df["mp_payment.CREDIT"].sum() > 0 + + def test_resample( + self, + client_no_amm, + ledger_collection, + pop_ledger_merge, + mnt_filepath, + user_factory, + product, + create_main_accounts, + offset, + duration, + start, + thl_lm, + delete_df_collection, + incite_item_factory, + ): + from generalresearch.models.thl.user import User + + assert ledger_collection.finished is not None + delete_df_collection(coll=ledger_collection) + u1: User = user_factory(product=product) + + bp_account = thl_lm.get_account_or_create_bp_wallet(product=u1.product) + + for item in ledger_collection.items: + incite_item_factory(user=u1, item=item) + item.initial_load(overwrite=True) + + pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection) + + item_finishes = [i.finish for i in ledger_collection.items] + item_finishes.sort(reverse=True) + last_item_finish = item_finishes[0] + + ddf = pop_ledger_merge.ddf( + columns=numerical_col_names + ["time_idx"], + filters=[ + ("account_id", "==", bp_account.uuid), + ("time_idx", ">=", ledger_collection.start), + ("time_idx", "<", last_item_finish), + ], + ) + df = client_no_amm.compute(collections=ddf, sync=True) + assert isinstance(df, pd.DataFrame) + assert isinstance(df.index, pd.Index) + assert not isinstance(df.index, pd.RangeIndex) + + # Now change the index so we can easily resample it + df = df.set_index("time_idx") + assert isinstance(df.index, pd.Index) + assert isinstance(df.index, pd.DatetimeIndex) + + bp_account_balance = thl_lm.get_account_balance(account=bp_account) + + # Initial sum + initial_sum = df.sum().sum() + # assert len(df) == 48 # msg="Original df should be 48 rows" + + # Original (1min) to 5min + df_5min = df.resample("5min").sum() + # assert len(df_5min) == 12 + assert initial_sum == df_5min.sum().sum() + + # 30min + df_30min = df.resample("30min").sum() + # assert len(df_30min) == 2 + assert initial_sum == df_30min.sum().sum() + + # 1hr + df_1hr = df.resample("1h").sum() + # assert len(df_1hr) == 1 + assert initial_sum == df_1hr.sum().sum() + + # 1 day + df_1day = df.resample("1d").sum() + # assert len(df_1day) == 1 + assert initial_sum == df_1day.sum().sum() diff --git a/tests/incite/mergers/test_ym_survey_merge.py b/tests/incite/mergers/test_ym_survey_merge.py new file mode 100644 index 0000000..4c2df6b --- /dev/null +++ b/tests/incite/mergers/test_ym_survey_merge.py @@ -0,0 +1,125 @@ +from datetime import timedelta, timezone, datetime +from itertools import product + +import pandas as pd +import pytest + +# noinspection PyUnresolvedReferences +from distributed.utils_test import ( + gen_cluster, + client_no_amm, + loop, + loop_in_thread, + cleanup, + cluster_fixture, + client, +) + +from test_utils.incite.collections.conftest import wall_collection, session_collection +from test_utils.incite.mergers.conftest import ( + enriched_session_merge, + ym_survey_wall_merge, +) + + +@pytest.mark.parametrize( + argnames="offset, duration, start", + argvalues=list( + product( + ["12h", "3D"], + [timedelta(days=30)], + [ + (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace( + microsecond=0 + ) + ], + ) + ), +) +class TestYMSurveyMerge: + """We override start, not because it's needed on the YMSurveyWall merge, + which operates on a rolling 10-day window, but because we don't want + to mock data in the wall collection and enriched_session_merge from + the 1800s and then wonder why there is no data available in the past + 10 days in the database. + """ + + def test_base( + self, + client_no_amm, + user_factory, + product, + ym_survey_wall_merge, + wall_collection, + session_collection, + enriched_session_merge, + delete_df_collection, + incite_item_factory, + thl_web_rr, + ): + from generalresearch.models.thl.user import User + + delete_df_collection(coll=session_collection) + user: User = user_factory(product=product, created=session_collection.start) + + # -- Build & Setup + assert ym_survey_wall_merge.start is None + assert ym_survey_wall_merge.offset == "10D" + + for item in session_collection.items: + incite_item_factory(item=item, user=user) + item.initial_load() + for item in wall_collection.items: + item.initial_load() + + # Confirm any of the items are archived + assert session_collection.progress.has_archive.eq(True).all() + assert wall_collection.progress.has_archive.eq(True).all() + + enriched_session_merge.build( + client=client_no_amm, + session_coll=session_collection, + wall_coll=wall_collection, + pg_config=thl_web_rr, + ) + assert enriched_session_merge.progress.has_archive.eq(True).all() + + ddf = enriched_session_merge.ddf() + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + + # -- + + ym_survey_wall_merge.build( + client=client_no_amm, + wall_coll=wall_collection, + enriched_session=enriched_session_merge, + ) + assert ym_survey_wall_merge.progress.has_archive.eq(True).all() + + # -- + + ddf = ym_survey_wall_merge.ddf() + df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True) + + assert isinstance(df, pd.DataFrame) + assert not df.empty + + # -- + assert df.product_id.nunique() == 1 + assert df.team_id.nunique() == 1 + assert df.source.nunique() > 1 + + started_min_ts = df.started.min() + started_max_ts = df.started.max() + + assert type(started_min_ts) is pd.Timestamp + assert type(started_max_ts) is pd.Timestamp + + started_min: datetime = datetime.fromisoformat(str(started_min_ts)) + started_max: datetime = datetime.fromisoformat(str(started_max_ts)) + + started_delta = started_max - started_min + assert started_delta >= timedelta(days=3) diff --git a/tests/incite/schemas/__init__.py b/tests/incite/schemas/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/incite/schemas/__init__.py diff --git a/tests/incite/schemas/test_admin_responses.py b/tests/incite/schemas/test_admin_responses.py new file mode 100644 index 0000000..43aa399 --- /dev/null +++ b/tests/incite/schemas/test_admin_responses.py @@ -0,0 +1,239 @@ +from datetime import datetime, timezone, timedelta +from random import sample +from typing import List + +import numpy as np +import pandas as pd +import pytest + +from generalresearch.incite.schemas import empty_dataframe_from_schema +from generalresearch.incite.schemas.admin_responses import ( + AdminPOPSchema, + SIX_HOUR_SECONDS, +) +from generalresearch.locales import Localelator + + +class TestAdminPOPSchema: + schema_df = empty_dataframe_from_schema(AdminPOPSchema) + countries = list(Localelator().get_all_countries())[:5] + dates = [datetime(year=2024, month=1, day=i, tzinfo=None) for i in range(1, 10)] + + @classmethod + def assign_valid_vals(cls, df: pd.DataFrame) -> pd.DataFrame: + for c in df.columns: + check_attrs: dict = AdminPOPSchema.columns[c].checks[0].statistics + df[c] = np.random.randint( + check_attrs["min_value"], check_attrs["max_value"], df.shape[0] + ) + + return df + + def test_empty(self): + with pytest.raises(Exception): + AdminPOPSchema.validate(pd.DataFrame()) + + def test_new_empty_df(self): + df = empty_dataframe_from_schema(AdminPOPSchema) + + assert isinstance(df, pd.DataFrame) + assert isinstance(df.index, pd.MultiIndex) + assert df.columns.size == len(AdminPOPSchema.columns) + + def test_valid(self): + # (1) Works with raw naive datetime + dates = [ + datetime(year=2024, month=1, day=i, tzinfo=None).isoformat() + for i in range(1, 10) + ] + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[dates, self.countries], names=["index0", "index1"] + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + df = AdminPOPSchema.validate(df) + assert isinstance(df, pd.DataFrame) + + # (2) Works with isoformat naive datetime + dates = [datetime(year=2024, month=1, day=i, tzinfo=None) for i in range(1, 10)] + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[dates, self.countries], names=["index0", "index1"] + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + df = AdminPOPSchema.validate(df) + assert isinstance(df, pd.DataFrame) + + def test_index_tz_parser(self): + tz_dates = [ + datetime(year=2024, month=1, day=i, tzinfo=timezone.utc) + for i in range(1, 10) + ] + + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[tz_dates, self.countries], names=["index0", "index1"] + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + # Initially, they're all set with a timezone + timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)] + assert all([ts.tz == timezone.utc for ts in timestmaps]) + + # After validation, the timezone is removed + df = AdminPOPSchema.validate(df) + timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)] + assert all([ts.tz is None for ts in timestmaps]) + + def test_index_tz_no_future_beyond_one_year(self): + now = datetime.now(tz=timezone.utc) + tz_dates = [now + timedelta(days=i * 365) for i in range(1, 10)] + + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[tz_dates, self.countries], names=["index0", "index1"] + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + with pytest.raises(Exception) as cm: + AdminPOPSchema.validate(df) + + assert ( + "Index 'index0' failed element-wise validator " + "number 0: less_than(" in str(cm.value) + ) + + def test_index_only_str(self): + # --- float64 to str! --- + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[self.dates, np.random.rand(1, 10)[0]], + names=["index0", "index1"], + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + vals = [i for i in df.index.get_level_values(1)] + assert all([isinstance(v, float) for v in vals]) + + df = AdminPOPSchema.validate(df, lazy=True) + + vals = [i for i in df.index.get_level_values(1)] + assert all([isinstance(v, str) for v in vals]) + + # --- int to str --- + + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[self.dates, sample(range(100), 20)], + names=["index0", "index1"], + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + vals = [i for i in df.index.get_level_values(1)] + assert all([isinstance(v, int) for v in vals]) + + df = AdminPOPSchema.validate(df, lazy=True) + + vals = [i for i in df.index.get_level_values(1)] + assert all([isinstance(v, str) for v in vals]) + + # a = 1 + assert isinstance(df, pd.DataFrame) + + def test_invalid_parsing(self): + # (1) Timezones AND as strings will still parse correctly + tz_str_dates = [ + datetime( + year=2024, month=1, day=1, minute=i, tzinfo=timezone.utc + ).isoformat() + for i in range(1, 10) + ] + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[tz_str_dates, self.countries], + names=["index0", "index1"], + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + df = AdminPOPSchema.validate(df, lazy=True) + + assert isinstance(df, pd.DataFrame) + timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)] + assert all([ts.tz is None for ts in timestmaps]) + + # (2) Timezones are removed + dates = [ + datetime(year=2024, month=1, day=1, minute=i, tzinfo=timezone.utc) + for i in range(1, 10) + ] + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[dates, self.countries], names=["index0", "index1"] + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + # Has tz before validation, and none after + timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)] + assert all([ts.tz is timezone.utc for ts in timestmaps]) + + df = AdminPOPSchema.validate(df, lazy=True) + + timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)] + assert all([ts.tz is None for ts in timestmaps]) + + def test_clipping(self): + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[self.dates, self.countries], + names=["index0", "index1"], + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + df = AdminPOPSchema.validate(df) + assert df.elapsed_avg.max() < SIX_HOUR_SECONDS + + # Now that we know it's valid, break the elapsed avg + df["elapsed_avg"] = np.random.randint( + SIX_HOUR_SECONDS, SIX_HOUR_SECONDS + 10_000, df.shape[0] + ) + assert df.elapsed_avg.max() > SIX_HOUR_SECONDS + + # Confirm it doesn't fail if the values are greater, and that + # all the values are clipped to the max + df = AdminPOPSchema.validate(df) + assert df.elapsed_avg.eq(SIX_HOUR_SECONDS).all() + + def test_rounding(self): + df = pd.DataFrame( + index=pd.MultiIndex.from_product( + iterables=[self.dates, self.countries], + names=["index0", "index1"], + ), + columns=self.schema_df.columns, + ) + df = self.assign_valid_vals(df) + + df["payout_avg"] = 2.123456789900002 + + assert df.payout_avg.sum() == 95.5555555455001 + + df = AdminPOPSchema.validate(df) + assert df.payout_avg.sum() == 95.40000000000003 diff --git a/tests/incite/schemas/test_thl_web.py b/tests/incite/schemas/test_thl_web.py new file mode 100644 index 0000000..7f4434b --- /dev/null +++ b/tests/incite/schemas/test_thl_web.py @@ -0,0 +1,70 @@ +import pandas as pd +import pytest +from pandera.errors import SchemaError + + +class TestWallSchema: + + def test_empty(self): + from generalresearch.incite.schemas.thl_web import THLWallSchema + + with pytest.raises(SchemaError): + THLWallSchema.validate(pd.DataFrame()) + + def test_index_missing(self): + from generalresearch.incite.schemas.thl_web import THLWallSchema + + df = pd.DataFrame(columns=THLWallSchema.columns.keys()) + + with pytest.raises(SchemaError) as cm: + THLWallSchema.validate(df) + + def test_no_rows(self): + from generalresearch.incite.schemas.thl_web import THLWallSchema + + df = pd.DataFrame(index=["uuid"], columns=THLWallSchema.columns.keys()) + + with pytest.raises(SchemaError) as cm: + THLWallSchema.validate(df) + + def test_new_empty_df(self): + from generalresearch.incite.schemas import empty_dataframe_from_schema + from generalresearch.incite.schemas.thl_web import THLWallSchema + + df = empty_dataframe_from_schema(THLWallSchema) + assert isinstance(df, pd.DataFrame) + assert df.columns.size == 20 + + +class TestSessionSchema: + + def test_empty(self): + from generalresearch.incite.schemas.thl_web import THLSessionSchema + + with pytest.raises(SchemaError): + THLSessionSchema.validate(pd.DataFrame()) + + def test_index_missing(self): + from generalresearch.incite.schemas.thl_web import THLSessionSchema + + df = pd.DataFrame(columns=THLSessionSchema.columns.keys()) + df.set_index("uuid", inplace=True) + + with pytest.raises(SchemaError) as cm: + THLSessionSchema.validate(df) + + def test_no_rows(self): + from generalresearch.incite.schemas.thl_web import THLSessionSchema + + df = pd.DataFrame(index=["id"], columns=THLSessionSchema.columns.keys()) + + with pytest.raises(SchemaError) as cm: + THLSessionSchema.validate(df) + + def test_new_empty_df(self): + from generalresearch.incite.schemas import empty_dataframe_from_schema + from generalresearch.incite.schemas.thl_web import THLSessionSchema + + df = empty_dataframe_from_schema(THLSessionSchema) + assert isinstance(df, pd.DataFrame) + assert df.columns.size == 21 diff --git a/tests/incite/test_collection_base.py b/tests/incite/test_collection_base.py new file mode 100644 index 0000000..497e5ab --- /dev/null +++ b/tests/incite/test_collection_base.py @@ -0,0 +1,318 @@ +from datetime import datetime, timezone, timedelta +from os.path import exists as pexists, join as pjoin +from pathlib import Path +from uuid import uuid4 + +import numpy as np +import pandas as pd +import pytest +from _pytest._code.code import ExceptionInfo + +from generalresearch.incite.base import CollectionBase +from test_utils.incite.conftest import mnt_filepath + +AGO_15min = (datetime.now(tz=timezone.utc) - timedelta(minutes=15)).replace( + microsecond=0 +) +AGO_1HR = (datetime.now(tz=timezone.utc) - timedelta(hours=1)).replace(microsecond=0) +AGO_2HR = (datetime.now(tz=timezone.utc) - timedelta(hours=2)).replace(microsecond=0) + + +class TestCollectionBase: + def test_init(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.df.empty is True + + def test_init_df(self, mnt_filepath): + # Only an empty pd.DataFrame can ever be provided + instance = CollectionBase( + df=pd.DataFrame({}), archive_path=mnt_filepath.data_src + ) + assert isinstance(instance.df, pd.DataFrame) + + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase( + df=pd.DataFrame(columns=[0, 1, 2]), archive_path=mnt_filepath.data_src + ) + assert "Do not provide a pd.DataFrame" in str(cm.value) + + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase( + df=pd.DataFrame(np.random.randint(100, size=(1000, 1)), columns=["A"]), + archive_path=mnt_filepath.data_src, + ) + assert "Do not provide a pd.DataFrame" in str(cm.value) + + def test_init_start(self, mnt_filepath): + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase( + start=datetime.now(tz=timezone.utc) - timedelta(days=10), + archive_path=mnt_filepath.data_src, + ) + assert "Collection.start must not have microseconds" in str(cm.value) + + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + tz = timezone(timedelta(hours=-5), "EST") + + CollectionBase( + start=datetime(year=2000, month=1, day=1, tzinfo=tz), + archive_path=mnt_filepath.data_src, + ) + assert "Timezone is not UTC" in str(cm.value) + + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.start == datetime( + year=2018, month=1, day=1, tzinfo=timezone.utc + ) + + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase( + start=AGO_2HR, offset="3h", archive_path=mnt_filepath.data_src + ) + assert "Offset must be equal to, or smaller the start timestamp" in str( + cm.value + ) + + def test_init_archive_path(self, mnt_filepath): + """DirectoryPath is apparently smart enough to confirm that the + directory path exists. + """ + + # (1) Basic, confirm an existing path works + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.archive_path == mnt_filepath.data_src + + # (2) It can't point to a file + file_path = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}.zip")) + assert not pexists(file_path) + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase(archive_path=file_path) + assert "Path does not point to a directory" in str(cm.value) + + # (3) It doesn't create the directory if it doesn't exist + new_path = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}/")) + assert not pexists(new_path) + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase(archive_path=new_path) + assert "Path does not point to a directory" in str(cm.value) + + def test_init_offset(self, mnt_filepath): + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase(offset="1:X", archive_path=mnt_filepath.data_src) + assert "Invalid offset alias provided. Please review:" in str(cm.value) + + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase(offset=f"59sec", archive_path=mnt_filepath.data_src) + assert "Must be equal to, or longer than 1 min" in str(cm.value) + + with pytest.raises(expected_exception=ValueError) as cm: + cm: ExceptionInfo + CollectionBase(offset=f"{365 * 101}d", archive_path=mnt_filepath.data_src) + assert "String should have at most 5 characters" in str(cm.value) + + +class TestCollectionBaseProperties: + + def test_items(self, mnt_filepath): + with pytest.raises(expected_exception=NotImplementedError) as cm: + cm: ExceptionInfo + instance = CollectionBase(archive_path=mnt_filepath.data_src) + x = instance.items + assert "Must override" in str(cm.value) + + def test_interval_range(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + # Private method requires the end parameter + with pytest.raises(expected_exception=AssertionError) as cm: + cm: ExceptionInfo + instance._interval_range(end=None) + assert "an end value must be provided" in str(cm.value) + + # End param must be same as started (which forces utc) + tz = timezone(timedelta(hours=-5), "EST") + with pytest.raises(expected_exception=AssertionError) as cm: + cm: ExceptionInfo + instance._interval_range(end=datetime.now(tz=tz)) + assert "Timezones must match" in str(cm.value) + + res = instance._interval_range(end=datetime.now(tz=timezone.utc)) + assert isinstance(res, pd.IntervalIndex) + assert res.closed_left + assert res.is_non_overlapping_monotonic + assert res.is_monotonic_increasing + assert res.is_unique + + def test_interval_range2(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert isinstance(instance.interval_range, list) + + # 1 hrs ago has 2 x 30min + the future 30min + OFFSET = "30min" + instance = CollectionBase( + start=AGO_1HR, offset=OFFSET, archive_path=mnt_filepath.data_src + ) + assert len(instance.interval_range) == 3 + assert instance.interval_range[0][0] == AGO_1HR + + # 1 hrs ago has 1 x 60min + the future 60min + OFFSET = "60min" + instance = CollectionBase( + start=AGO_1HR, offset=OFFSET, archive_path=mnt_filepath.data_src + ) + assert len(instance.interval_range) == 2 + + def test_progress(self, mnt_filepath): + with pytest.raises(expected_exception=NotImplementedError) as cm: + cm: ExceptionInfo + instance = CollectionBase( + start=AGO_15min, offset="3min", archive_path=mnt_filepath.data_src + ) + x = instance.progress + assert "Must override" in str(cm.value) + + def test_progress2(self, mnt_filepath): + instance = CollectionBase( + start=AGO_2HR, + offset="15min", + archive_path=mnt_filepath.data_src, + ) + assert instance.df.empty + + with pytest.raises(expected_exception=NotImplementedError) as cm: + df = instance.progress + assert "Must override" in str(cm.value) + + def test_items2(self, mnt_filepath): + """There can't be a test for this because the Items need a path whic + isn't possible in the generic form + """ + instance = CollectionBase( + start=AGO_1HR, offset="5min", archive_path=mnt_filepath.data_src + ) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + cm: ExceptionInfo + items = instance.items + assert "Must override" in str(cm.value) + + # item = items[-3] + # ddf = instance.ddf(items=[item], include_partial=True, force_rr_latest=False) + # df = item.validate_ddf(ddf=ddf) + # assert isinstance(df, pd.DataFrame) + # assert len(df.columns) == 16 + # assert str(df.product_id.dtype) == "object" + # assert str(ddf.product_id.dtype) == "string" + + def test_items3(self, mnt_filepath): + instance = CollectionBase( + start=AGO_2HR, + offset="15min", + archive_path=mnt_filepath.data_src, + ) + with pytest.raises(expected_exception=NotImplementedError) as cm: + item = instance.items[0] + assert "Must override" in str(cm.value) + + +class TestCollectionBaseMethodsCleanup: + def test_fetch_force_rr_latest(self, mnt_filepath): + coll = CollectionBase(archive_path=mnt_filepath.data_src) + + with pytest.raises(expected_exception=Exception) as cm: + cm: ExceptionInfo + coll.fetch_force_rr_latest(sources=[]) + assert "Must override" in str(cm.value) + + def test_fetch_all_paths(self, mnt_filepath): + coll = CollectionBase(archive_path=mnt_filepath.data_src) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + cm: ExceptionInfo + coll.fetch_all_paths( + items=None, force_rr_latest=False, include_partial=False + ) + assert "Must override" in str(cm.value) + + +class TestCollectionBaseMethodsCleanup: + @pytest.mark.skip + def test_cleanup_partials(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.cleanup_partials() is None # it doesn't return anything + + def test_clear_tmp_archives(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.clear_tmp_archives() is None # it doesn't return anything + + @pytest.mark.skip + def test_clear_corrupt_archives(self, mnt_filepath): + """TODO: expand this so it actually has corrupt archives that we + check to see if they're removed + """ + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.clear_corrupt_archives() is None # it doesn't return anything + + @pytest.mark.skip + def test_rebuild_symlinks(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + assert instance.rebuild_symlinks() is None + + +class TestCollectionBaseMethodsSourceTiming: + + def test_get_item(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + i = pd.Interval(left=1, right=2, closed="left") + + with pytest.raises(expected_exception=NotImplementedError) as cm: + instance.get_item(interval=i) + assert "Must override" in str(cm.value) + + def test_get_item_start(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + + dt = datetime.now(tz=timezone.utc) + start = pd.Timestamp(dt) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + instance.get_item_start(start=start) + assert "Must override" in str(cm.value) + + def test_get_items(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + + dt = datetime.now(tz=timezone.utc) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + instance.get_items(since=dt) + assert "Must override" in str(cm.value) + + def test_get_items_from_year(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + instance.get_items_from_year(year=2020) + assert "Must override" in str(cm.value) + + def test_get_items_last90(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + instance.get_items_last90() + assert "Must override" in str(cm.value) + + def test_get_items_last365(self, mnt_filepath): + instance = CollectionBase(archive_path=mnt_filepath.data_src) + + with pytest.raises(expected_exception=NotImplementedError) as cm: + instance.get_items_last365() + assert "Must override" in str(cm.value) diff --git a/tests/incite/test_collection_base_item.py b/tests/incite/test_collection_base_item.py new file mode 100644 index 0000000..e5d1d02 --- /dev/null +++ b/tests/incite/test_collection_base_item.py @@ -0,0 +1,223 @@ +from datetime import datetime, timezone +from os.path import join as pjoin +from pathlib import Path +from uuid import uuid4 + +import dask.dataframe as dd +import pandas as pd +import pytest +from pydantic import ValidationError + +from generalresearch.incite.base import CollectionItemBase + + +class TestCollectionItemBase: + def test_init(self): + dt = datetime.now(tz=timezone.utc).replace(microsecond=0) + + instance = CollectionItemBase() + instance2 = CollectionItemBase(start=dt) + + assert isinstance(instance, CollectionItemBase) + assert isinstance(instance2, CollectionItemBase) + + assert instance.start.second == instance2.start.second + assert 0 == instance.start.microsecond == instance2.start.microsecond + + def test_init_start(self): + dt = datetime.now(tz=timezone.utc) + + with pytest.raises(expected_exception=ValidationError) as cm: + CollectionItemBase(start=dt) + + assert "CollectionItem.start must not have microsecond precision" in str( + cm.value + ) + + +class TestCollectionItemBaseProperties: + + def test_finish(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.finish + + def test_interval(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.interval + + def test_filename(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=NotImplementedError) as cm: + res = instance.filename + + assert "Do not use CollectionItemBase directly" in str(cm.value) + + def test_partial_filename(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=NotImplementedError) as cm: + res = instance.filename + + assert "Do not use CollectionItemBase directly" in str(cm.value) + + def test_empty_filename(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=NotImplementedError) as cm: + res = instance.filename + + assert "Do not use CollectionItemBase directly" in str(cm.value) + + def test_path(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.path + + def test_partial_path(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.partial_path + + def test_empty_path(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.empty_path + + +class TestCollectionItemBaseMethods: + + @pytest.mark.skip + def test_next_numbered_path(self): + pass + + @pytest.mark.skip + def test_search_highest_numbered_path(self): + pass + + def test_tmp_filename(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=NotImplementedError) as cm: + res = instance.tmp_filename() + assert "Do not use CollectionItemBase directly" in str(cm.value) + + def test_tmp_path(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.tmp_path() + + def test_is_empty(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.is_empty() + + def test_has_empty(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.has_empty() + + def test_has_partial_archive(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.has_partial_archive() + + @pytest.mark.parametrize("include_empty", [True, False]) + def test_has_archive(self, include_empty): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.has_archive(include_empty=include_empty) + + def test_delete_archive_file(self, mnt_filepath): + path1 = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}.zip")) + + # Confirm it doesn't exist, and that delete_archive() doesn't throw + # an error when trying to delete a non-existent file or folder + assert not path1.exists() + CollectionItemBase.delete_archive(generic_path=path1) + # TODO: LOG.warning(f"tried removing non-existent file: {generic_path}") + + # Create it, confirm it exists, delete it, and confirm it doesn't exist + path1.touch() + assert path1.exists() + CollectionItemBase.delete_archive(generic_path=path1) + assert not path1.exists() + + def test_delete_archive_dir(self, mnt_filepath): + path1 = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}")) + + # Confirm it doesn't exist, and that delete_archive() doesn't throw + # an error when trying to delete a non-existent file or folder + assert not path1.exists() + CollectionItemBase.delete_archive(generic_path=path1) + # TODO: LOG.warning(f"tried removing non-existent file: {generic_path}") + + # Create it, confirm it exists, delete it, and confirm it doesn't exist + path1.mkdir() + assert path1.exists() + assert path1.is_dir() + CollectionItemBase.delete_archive(generic_path=path1) + assert not path1.exists() + + def test_should_archive(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.should_archive() + + def test_set_empty(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.set_empty() + + def test_valid_archive(self): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=AttributeError) as cm: + res = instance.valid_archive(generic_path=None, sample=None) + + +class TestCollectionItemBaseMethodsORM: + + @pytest.mark.skip + def test_from_archive(self): + pass + + @pytest.mark.parametrize("is_partial", [True, False]) + def test_to_archive(self, is_partial): + instance = CollectionItemBase() + + with pytest.raises(expected_exception=NotImplementedError) as cm: + res = instance.to_archive( + ddf=dd.from_pandas(data=pd.DataFrame()), is_partial=is_partial + ) + assert "Must override" in str(cm.value) + + @pytest.mark.skip + def test__to_dict(self): + pass + + @pytest.mark.skip + def test_delete_partial(self): + pass + + @pytest.mark.skip + def test_cleanup_partials(self): + pass + + @pytest.mark.skip + def test_delete_dangling_partials(self): + pass diff --git a/tests/incite/test_grl_flow.py b/tests/incite/test_grl_flow.py new file mode 100644 index 0000000..c632f9a --- /dev/null +++ b/tests/incite/test_grl_flow.py @@ -0,0 +1,23 @@ +class TestGRLFlow: + + def test_init(self, mnt_filepath, thl_web_rr): + from generalresearch.incite.defaults import ( + ledger_df_collection, + task_df_collection, + pop_ledger as plm, + ) + + from generalresearch.incite.collections.thl_web import ( + LedgerDFCollection, + TaskAdjustmentDFCollection, + ) + from generalresearch.incite.mergers.pop_ledger import PopLedgerMerge + + ledger_df = ledger_df_collection(ds=mnt_filepath, pg_config=thl_web_rr) + assert isinstance(ledger_df, LedgerDFCollection) + + task_df = task_df_collection(ds=mnt_filepath, pg_config=thl_web_rr) + assert isinstance(task_df, TaskAdjustmentDFCollection) + + pop_ledger = plm(ds=mnt_filepath) + assert isinstance(pop_ledger, PopLedgerMerge) diff --git a/tests/incite/test_interval_idx.py b/tests/incite/test_interval_idx.py new file mode 100644 index 0000000..ea2bced --- /dev/null +++ b/tests/incite/test_interval_idx.py @@ -0,0 +1,23 @@ +import pandas as pd +from datetime import datetime, timezone, timedelta + + +class TestIntervalIndex: + + def test_init(self): + start = datetime(year=2000, month=1, day=1) + end = datetime(year=2000, month=1, day=10) + + iv_r: pd.IntervalIndex = pd.interval_range( + start=start, end=end, freq="1d", closed="left" + ) + assert isinstance(iv_r, pd.IntervalIndex) + assert len(iv_r.to_list()) == 9 + + # If the offset is longer than the end - start it will not + # error. It will simply have 0 rows. + iv_r: pd.IntervalIndex = pd.interval_range( + start=start, end=end, freq="30d", closed="left" + ) + assert isinstance(iv_r, pd.IntervalIndex) + assert len(iv_r.to_list()) == 0 |
