aboutsummaryrefslogtreecommitdiff
path: root/tests/incite
diff options
context:
space:
mode:
Diffstat (limited to 'tests/incite')
-rw-r--r--tests/incite/__init__.py137
-rw-r--r--tests/incite/collections/__init__.py0
-rw-r--r--tests/incite/collections/test_df_collection_base.py113
-rw-r--r--tests/incite/collections/test_df_collection_item_base.py72
-rw-r--r--tests/incite/collections/test_df_collection_item_thl_web.py994
-rw-r--r--tests/incite/collections/test_df_collection_thl_marketplaces.py75
-rw-r--r--tests/incite/collections/test_df_collection_thl_web.py160
-rw-r--r--tests/incite/collections/test_df_collection_thl_web_ledger.py32
-rw-r--r--tests/incite/mergers/__init__.py0
-rw-r--r--tests/incite/mergers/foundations/__init__.py0
-rw-r--r--tests/incite/mergers/foundations/test_enriched_session.py138
-rw-r--r--tests/incite/mergers/foundations/test_enriched_task_adjust.py76
-rw-r--r--tests/incite/mergers/foundations/test_enriched_wall.py236
-rw-r--r--tests/incite/mergers/foundations/test_user_id_product.py73
-rw-r--r--tests/incite/mergers/test_merge_collection.py102
-rw-r--r--tests/incite/mergers/test_merge_collection_item.py66
-rw-r--r--tests/incite/mergers/test_pop_ledger.py307
-rw-r--r--tests/incite/mergers/test_ym_survey_merge.py125
-rw-r--r--tests/incite/schemas/__init__.py0
-rw-r--r--tests/incite/schemas/test_admin_responses.py239
-rw-r--r--tests/incite/schemas/test_thl_web.py70
-rw-r--r--tests/incite/test_collection_base.py318
-rw-r--r--tests/incite/test_collection_base_item.py223
-rw-r--r--tests/incite/test_grl_flow.py23
-rw-r--r--tests/incite/test_interval_idx.py23
25 files changed, 3602 insertions, 0 deletions
diff --git a/tests/incite/__init__.py b/tests/incite/__init__.py
new file mode 100644
index 0000000..2f736e8
--- /dev/null
+++ b/tests/incite/__init__.py
@@ -0,0 +1,137 @@
+# class TestParquetBehaviors(CleanTempDirectoryTestCls):
+# wall_coll = WallDFCollection(
+# start=GLOBAL_VARS["wall"].start,
+# offset="49h",
+# archive_path=f"{settings.incite_mount_dir}/raw/df-collections/{DFCollectionType.WALL.value}",
+# )
+#
+# def test_filters(self):
+# # Using REAL data here
+# start = datetime(year=2024, month=1, day=15, hour=12, tzinfo=timezone.utc)
+# end = datetime(year=2024, month=1, day=15, hour=20, tzinfo=timezone.utc)
+# end_max = datetime(
+# year=2024, month=1, day=15, hour=20, tzinfo=timezone.utc
+# ) + timedelta(hours=2)
+#
+# ir = pd.Interval(left=pd.Timestamp(start), right=pd.Timestamp(end))
+# wall_items = [w for w in self.wall_coll.items if w.interval.overlaps(ir)]
+# ddf = self.wall_coll.ddf(
+# items=wall_items,
+# include_partial=True,
+# force_rr_latest=False,
+# columns=["started", "finished"],
+# filters=[
+# ("started", ">=", start),
+# ("started", "<", end),
+# ],
+# )
+#
+# df = ddf.compute()
+# self.assertIsInstance(df, pd.DataFrame)
+#
+# # No started=None, and they're all between the started and the end
+# self.assertFalse(df.started.isna().any())
+# self.assertFalse((df.started < start).any())
+# self.assertFalse((df.started > end).any())
+#
+# # Has finished=None and finished=time, so
+# # the finished is all between the started and
+# # the end_max
+# self.assertTrue(df.finished.isna().any())
+# self.assertTrue((df.finished.dt.year == 2024).any())
+#
+# self.assertFalse((df.finished > end_max).any())
+# self.assertFalse((df.finished < start).any())
+#
+# # def test_user_id_list(self):
+# # # Calling compute turns it into a np.ndarray
+# # user_ids = self.instance.ddf(
+# # columns=["user_id"]
+# # ).user_id.unique().values.compute()
+# # self.assertIsInstance(user_ids, np.ndarray)
+# #
+# # # If ddf filters work with ndarray
+# # user_product_merge = <todo: assign>
+# #
+# # with self.assertRaises(TypeError) as cm:
+# # user_product_merge.ddf(
+# # filters=[("id", "in", user_ids)])
+# # self.assertIn("Value of 'in' filter must be a list, set or tuple.", str(cm.exception))
+# #
+# # # No compute == dask array
+# # user_ids = self.instance.ddf(
+# # columns=["user_id"]
+# # ).user_id.unique().values
+# # self.assertIsInstance(user_ids, da.Array)
+# #
+# # with self.assertRaises(TypeError) as cm:
+# # user_product_merge.ddf(
+# # filters=[("id", "in", user_ids)])
+# # self.assertIn("Value of 'in' filter must be a list, set or tuple.", str(cm.exception))
+# #
+# # # pick a product_id (most active one)
+# # self.product_id = instance.df.product_id.value_counts().index[0]
+# # self.expected_columns: int = len(instance._schema.columns)
+# # self.instance = instance
+#
+# # def test_basic(self):
+# # # now try to load up the data!
+# # self.instance.grouped_key = self.product_id
+# #
+# # # Confirm any of the items are archived
+# # self.assertTrue(self.instance.progress.has_archive.eq(True).any())
+# #
+# # # Confirm it returns a df
+# # df = self.instance.dd().compute()
+# #
+# # self.assertFalse(df.empty)
+# # self.assertEqual(df.shape[1], self.expected_columns)
+# # self.assertGreater(df.shape[0], 1)
+# #
+# # # Confirm that DF only contains this product_id
+# # self.assertEqual(df[df.product_id == self.product_id].shape, df.shape)
+#
+# # def test_god_vs_product_id(self):
+# # self.instance.grouped_key = self.product_id
+# # df_product_origin = self.instance.dd(columns=None, filters=None).compute()
+# #
+# # self.instance.grouped_key = None
+# # df_god_origin = self.instance.dd(columns=None,
+# # filters=[("product_id", "==", self.product_id)]).compute()
+# #
+# # self.assertTrue(df_god_origin.equals(df_product_origin))
+#
+# #
+# # instance = POPSessionMerge(
+# # start=START,
+# # archive_path=self.PATH,
+# # group_by="product_id"
+# # )
+# # instance.build(U=GLOBAL_VARS["user"], S=GLOBAL_VARS["session"], W=GLOBAL_VARS["wall"])
+# # instance.save(god_only=False)
+# #
+# # # pick a product_id (most active one)
+# # self.product_id = instance.df.product_id.value_counts().index[0]
+# # self.expected_columns: int = len(instance._schema.columns)
+# # self.instance = instance
+#
+#
+# class TestValidItem(CleanTempDirectoryTestCls):
+#
+# def test_interval(self):
+# for k in GLOBAL_VARS.keys():
+# coll = GLOBAL_VARS[k]
+# item = coll.items[0]
+# ir = item.interval
+#
+# self.assertIsInstance(ir, pd.Interval)
+# self.assertLess(a=ir.left, b=ir.right)
+#
+# def test_str(self):
+# for k in GLOBAL_VARS.keys():
+# coll = GLOBAL_VARS[k]
+# item = coll.items[0]
+#
+# offset = coll.offset or "–"
+#
+# self.assertIn(offset, str(item))
diff --git a/tests/incite/collections/__init__.py b/tests/incite/collections/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/incite/collections/__init__.py
diff --git a/tests/incite/collections/test_df_collection_base.py b/tests/incite/collections/test_df_collection_base.py
new file mode 100644
index 0000000..5aaa729
--- /dev/null
+++ b/tests/incite/collections/test_df_collection_base.py
@@ -0,0 +1,113 @@
+from datetime import datetime, timezone
+
+import pandas as pd
+import pytest
+from pandera import DataFrameSchema
+
+from generalresearch.incite.collections import (
+ DFCollectionType,
+ DFCollection,
+)
+from test_utils.incite.conftest import mnt_filepath
+
+df_collection_types = [e for e in DFCollectionType if e is not DFCollectionType.TEST]
+
+
+@pytest.mark.parametrize("df_coll_type", df_collection_types)
+class TestDFCollectionBase:
+ """None of these tests are about the DFCollection with any specific
+ data_type... that will be handled in other parameterized tests
+
+ """
+
+ def test_init(self, mnt_filepath, df_coll_type):
+ """Try to initialize the DFCollection with various invalid parameters"""
+ with pytest.raises(expected_exception=ValueError) as cm:
+ DFCollection(archive_path=mnt_filepath.data_src)
+ assert "Must explicitly provide a data_type" in str(cm.value)
+
+ # with pytest.raises(expected_exception=ValueError) as cm:
+ # DFCollection(
+ # data_type=DFCollectionType.TEST, archive_path=mnt_filepath.data_src
+ # )
+ # assert "Must provide a supported data_type" in str(cm.value)
+
+ instance = DFCollection(
+ data_type=DFCollectionType.WALL, archive_path=mnt_filepath.data_src
+ )
+ assert instance.data_type == DFCollectionType.WALL
+
+
+@pytest.mark.parametrize("df_coll_type", df_collection_types)
+class TestDFCollectionBaseProperties:
+
+ @pytest.mark.skip
+ def test_df_collection_items(self, mnt_filepath, df_coll_type):
+ instance = DFCollection(
+ data_type=df_coll_type,
+ start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc),
+ finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc),
+ offset="100d",
+ archive_path=mnt_filepath.archive_path(enum_type=df_coll_type),
+ )
+
+ assert len(instance.interval_range) == len(instance.items)
+ assert len(instance.items) == 366
+
+ def test_df_collection_progress(self, mnt_filepath, df_coll_type):
+ instance = DFCollection(
+ data_type=df_coll_type,
+ start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc),
+ finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc),
+ offset="100d",
+ archive_path=mnt_filepath.archive_path(enum_type=df_coll_type),
+ )
+
+ # Progress returns a dataframe with a row each Item
+ assert isinstance(instance.progress, pd.DataFrame)
+ assert instance.progress.shape == (366, 6)
+
+ def test_df_collection_schema(self, mnt_filepath, df_coll_type):
+ instance1 = DFCollection(
+ data_type=DFCollectionType.WALL, archive_path=mnt_filepath.data_src
+ )
+
+ instance2 = DFCollection(
+ data_type=DFCollectionType.SESSION, archive_path=mnt_filepath.data_src
+ )
+
+ assert instance1._schema != instance2._schema
+ assert isinstance(instance1._schema, DataFrameSchema)
+ assert isinstance(instance2._schema, DataFrameSchema)
+
+
+class TestDFCollectionBaseMethods:
+
+ @pytest.mark.skip
+ def test_initial_load(self, mnt_filepath, thl_web_rr):
+ instance = DFCollection(
+ pg_config=thl_web_rr,
+ data_type=DFCollectionType.USER,
+ start=datetime(year=2022, month=1, day=1, minute=0, tzinfo=timezone.utc),
+ finished=datetime(year=2022, month=1, day=1, minute=5, tzinfo=timezone.utc),
+ offset="2min",
+ archive_path=mnt_filepath.data_src,
+ )
+
+ # Confirm that there are no archives available yet
+ assert instance.progress.has_archive.eq(False).all()
+
+ instance.initial_load()
+ assert 47 == len(instance.ddf().index)
+ assert instance.progress.should_archive.eq(True).all()
+
+ # A few archives should have been made
+ assert not instance.progress.has_archive.eq(False).all()
+
+ @pytest.mark.skip
+ def test_fetch_force_rr_latest(self):
+ pass
+
+ @pytest.mark.skip
+ def test_force_rr_latest(self):
+ pass
diff --git a/tests/incite/collections/test_df_collection_item_base.py b/tests/incite/collections/test_df_collection_item_base.py
new file mode 100644
index 0000000..a0c0b0b
--- /dev/null
+++ b/tests/incite/collections/test_df_collection_item_base.py
@@ -0,0 +1,72 @@
+from datetime import datetime, timezone
+
+import pytest
+
+from generalresearch.incite.collections import (
+ DFCollectionType,
+ DFCollectionItem,
+ DFCollection,
+)
+from test_utils.incite.conftest import mnt_filepath
+
+df_collection_types = [e for e in DFCollectionType if e is not DFCollectionType.TEST]
+
+
+@pytest.mark.parametrize("df_coll_type", df_collection_types)
+class TestDFCollectionItemBase:
+
+ def test_init(self, mnt_filepath, df_coll_type):
+ collection = DFCollection(
+ data_type=df_coll_type,
+ offset="100d",
+ start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc),
+ finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc),
+ archive_path=mnt_filepath.archive_path(enum_type=df_coll_type),
+ )
+
+ item = DFCollectionItem()
+ item._collection = collection
+
+ assert isinstance(item, DFCollectionItem)
+
+
+@pytest.mark.parametrize("df_coll_type", df_collection_types)
+class TestDFCollectionItemProperties:
+
+ @pytest.mark.skip
+ def test_filename(self, df_coll_type):
+ pass
+
+
+@pytest.mark.parametrize("df_coll_type", df_collection_types)
+class TestDFCollectionItemMethods:
+
+ def test_has_mysql_false(self, mnt_filepath, df_coll_type):
+ collection = DFCollection(
+ data_type=df_coll_type,
+ offset="100d",
+ start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc),
+ finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc),
+ archive_path=mnt_filepath.archive_path(enum_type=df_coll_type),
+ )
+
+ instance1: DFCollectionItem = collection.items[0]
+ assert not instance1.has_mysql()
+
+ def test_has_mysql_true(self, thl_web_rr, mnt_filepath, df_coll_type):
+ collection = DFCollection(
+ data_type=df_coll_type,
+ offset="100d",
+ start=datetime(year=1800, month=1, day=1, tzinfo=timezone.utc),
+ finished=datetime(year=1900, month=1, day=1, tzinfo=timezone.utc),
+ archive_path=mnt_filepath.archive_path(enum_type=df_coll_type),
+ pg_config=thl_web_rr,
+ )
+
+ # Has RR, assume unittest server is online
+ instance2: DFCollectionItem = collection.items[0]
+ assert instance2.has_mysql()
+
+ @pytest.mark.skip
+ def test_update_partial_archive(self, df_coll_type):
+ pass
diff --git a/tests/incite/collections/test_df_collection_item_thl_web.py b/tests/incite/collections/test_df_collection_item_thl_web.py
new file mode 100644
index 0000000..9c3d67a
--- /dev/null
+++ b/tests/incite/collections/test_df_collection_item_thl_web.py
@@ -0,0 +1,994 @@
+from datetime import datetime, timezone, timedelta
+from itertools import product as iter_product
+from os.path import join as pjoin
+from pathlib import PurePath, Path
+from uuid import uuid4
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+from distributed import Client, Scheduler, Worker
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+from faker import Faker
+from pandera import DataFrameSchema
+from pydantic import FilePath
+
+from generalresearch.incite.base import CollectionItemBase
+from generalresearch.incite.collections import (
+ DFCollectionItem,
+ DFCollectionType,
+)
+from generalresearch.incite.schemas import ARCHIVE_AFTER
+from generalresearch.models.thl.user import User
+from generalresearch.pg_helper import PostgresConfig
+from generalresearch.sql_helper import PostgresDsn
+from test_utils.incite.conftest import mnt_filepath, incite_item_factory
+
+fake = Faker()
+
+df_collections = [
+ DFCollectionType.WALL,
+ DFCollectionType.SESSION,
+ DFCollectionType.LEDGER,
+ DFCollectionType.TASK_ADJUSTMENT,
+]
+
+unsupported_mock_types = {
+ DFCollectionType.IP_INFO,
+ DFCollectionType.IP_HISTORY,
+ DFCollectionType.IP_HISTORY_WS,
+ DFCollectionType.TASK_ADJUSTMENT,
+}
+
+
+def combo_object():
+ for x in iter_product(
+ df_collections,
+ ["15min", "45min", "1H"],
+ ):
+ yield x
+
+
+class TestDFCollectionItemBase:
+ def test_init(self):
+ instance = CollectionItemBase()
+ assert isinstance(instance, CollectionItemBase)
+ assert isinstance(instance.start, datetime)
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollectionItemProperties:
+
+ def test_filename(self, df_collection_data_type, df_collection, offset):
+ for i in df_collection.items:
+ assert isinstance(i.filename, str)
+
+ assert isinstance(i.path, PurePath)
+ assert i.path.name == i.filename
+
+ assert i._collection.data_type.name.lower() in i.filename
+ assert i._collection.offset in i.filename
+ assert i.start.strftime("%Y-%m-%d-%H-%M-%S") in i.filename
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollectionItemPropertiesBase:
+
+ def test_name(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.name, str)
+
+ def test_finish(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.finish, datetime)
+
+ def test_interval(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.interval, pd.Interval)
+
+ def test_partial_filename(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.partial_filename, str)
+
+ def test_empty_filename(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.empty_filename, str)
+
+ def test_path(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.path, FilePath)
+
+ def test_partial_path(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.partial_path, FilePath)
+
+ def test_empty_path(self, df_collection_data_type, offset, df_collection):
+ for i in df_collection.items:
+ assert isinstance(i.empty_path, FilePath)
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset, duration",
+ argvalues=list(
+ iter_product(
+ df_collections,
+ ["12h", "10D"],
+ [timedelta(days=10), timedelta(days=45)],
+ )
+ ),
+)
+class TestDFCollectionItemMethod:
+
+ def test_has_mysql(
+ self,
+ df_collection,
+ thl_web_rr,
+ offset,
+ duration,
+ df_collection_data_type,
+ delete_df_collection,
+ ):
+ delete_df_collection(coll=df_collection)
+
+ df_collection.pg_config = None
+ for i in df_collection.items:
+ assert not i.has_mysql()
+
+ # Confirm that the regular connection should work as expected
+ df_collection.pg_config = thl_web_rr
+ for i in df_collection.items:
+ assert i.has_mysql()
+
+ # Make a fake connection and confirm it does NOT work
+ df_collection.pg_config = PostgresConfig(
+ dsn=PostgresDsn(f"postgres://root:@127.0.0.1/{uuid4().hex}"),
+ connect_timeout=5,
+ statement_timeout=1,
+ )
+ for i in df_collection.items:
+ assert not i.has_mysql()
+
+ @pytest.mark.skip
+ def test_update_partial_archive(
+ self,
+ df_collection,
+ offset,
+ duration,
+ thl_web_rw,
+ df_collection_data_type,
+ delete_df_collection,
+ ):
+ # for i in collection.items:
+ # assert i.update_partial_archive()
+ # assert df.created.max() < _last_time_block[1]
+ pass
+
+ @pytest.mark.skip
+ def test_create_partial_archive(
+ self,
+ df_collection,
+ offset,
+ duration,
+ create_main_accounts,
+ thl_web_rw,
+ thl_lm,
+ df_collection_data_type,
+ user_factory,
+ product,
+ client_no_amm,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ assert 1 + 1 == 2
+
+ def test_dict(
+ self,
+ df_collection_data_type,
+ offset,
+ duration,
+ df_collection,
+ delete_df_collection,
+ ):
+ delete_df_collection(coll=df_collection)
+
+ for item in df_collection.items:
+ res = item.to_dict()
+ assert isinstance(res, dict)
+ assert len(res.keys()) == 6
+
+ assert isinstance(res["should_archive"], bool)
+ assert isinstance(res["has_archive"], bool)
+ assert isinstance(res["path"], Path)
+ assert isinstance(res["filename"], str)
+
+ assert isinstance(res["start"], datetime)
+ assert isinstance(res["finish"], datetime)
+ assert res["start"] < res["finish"]
+
+ def test_from_mysql(
+ self,
+ df_collection_data_type,
+ df_collection,
+ offset,
+ duration,
+ create_main_accounts,
+ thl_web_rw,
+ user_factory,
+ product,
+ incite_item_factory,
+ delete_df_collection,
+ ):
+ from generalresearch.models.thl.user import User
+
+ if df_collection.data_type in unsupported_mock_types:
+ return
+
+ delete_df_collection(coll=df_collection)
+ u1: User = user_factory(product=product)
+
+ # No data has been loaded, but we can confirm the from_mysql returns
+ # back an empty data with the correct columns
+ for item in df_collection.items:
+ # Unlike .from_mysql_ledger(), .from_mysql_standard() will return
+ # back and empty df with the correct columns in place
+ delete_df_collection(coll=df_collection)
+ df = item.from_mysql()
+ if df_collection.data_type == DFCollectionType.LEDGER:
+ assert df is None
+ else:
+ assert df.empty
+ assert set(df.columns) == set(df_collection._schema.columns.keys())
+
+ incite_item_factory(user=u1, item=item)
+
+ df = item.from_mysql()
+ assert not df.empty
+ assert set(df.columns) == set(df_collection._schema.columns.keys())
+ if df_collection.data_type == DFCollectionType.LEDGER:
+ # The number of rows in this dataframe will change depending
+ # on the mocking of data. It's because if the account has
+ # user wallet on, then there will be more transactions for
+ # example.
+ assert df.shape[0] > 0
+
+ def test_from_mysql_standard(
+ self,
+ df_collection_data_type,
+ df_collection,
+ offset,
+ duration,
+ user_factory,
+ product,
+ incite_item_factory,
+ delete_df_collection,
+ ):
+ from generalresearch.models.thl.user import User
+
+ if df_collection.data_type in unsupported_mock_types:
+ return
+ u1: User = user_factory(product=product)
+
+ delete_df_collection(coll=df_collection)
+
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ if df_collection.data_type == DFCollectionType.LEDGER:
+ # We're using parametrize, so this If statement is just to
+ # confirm other Item Types will always raise an assertion
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ res = item.from_mysql_standard()
+ assert (
+ "Can't call from_mysql_standard for Ledger DFCollectionItem"
+ in str(cm.value)
+ )
+
+ continue
+
+ # Unlike .from_mysql_ledger(), .from_mysql_standard() will return
+ # back and empty df with the correct columns in place
+ df = item.from_mysql_standard()
+ assert df.empty
+ assert set(df.columns) == set(df_collection._schema.columns.keys())
+
+ incite_item_factory(user=u1, item=item)
+
+ df = item.from_mysql_standard()
+ assert not df.empty
+ assert set(df.columns) == set(df_collection._schema.columns.keys())
+ assert df.shape[0] > 0
+
+ def test_from_mysql_ledger(
+ self,
+ df_collection,
+ user,
+ create_main_accounts,
+ offset,
+ duration,
+ thl_web_rw,
+ thl_lm,
+ df_collection_data_type,
+ user_factory,
+ product,
+ client_no_amm,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ from generalresearch.models.thl.user import User
+
+ if df_collection.data_type != DFCollectionType.LEDGER:
+ return
+ u1: User = user_factory(product=product)
+
+ delete_df_collection(coll=df_collection)
+
+ for item in df_collection.items:
+ item: DFCollectionItem
+ delete_df_collection(coll=df_collection)
+
+ # Okay, now continue with the actual Ledger Item tests... we need
+ # to ensure that this item.start - item.finish range hasn't had
+ # any prior transactions created within that range.
+ assert item.from_mysql_ledger() is None
+
+ # Create main accounts doesn't matter because it doesn't
+ # add any transactions to the db
+ assert item.from_mysql_ledger() is None
+
+ incite_item_factory(user=u1, item=item)
+ df = item.from_mysql_ledger()
+ assert isinstance(df, pd.DataFrame)
+
+ # Not only is this a np.int64 to int comparison, but I also know it
+ # isn't actually measuring anything meaningful. However, it's useful
+ # as it tells us if the DF contains all the correct TX Entries. I
+ # figured it's better to count the amount rather than just the
+ # number of rows. DF == transactions * 2 because there are two
+ # entries per transactions
+ # assert df.amount.sum() == total_amt
+ # assert total_entries == df.shape[0]
+
+ assert not df.tx_id.is_unique
+ df["net"] = df.direction * df.amount
+ assert df.groupby("tx_id").net.sum().sum() == 0
+
+ def test_to_archive(
+ self,
+ df_collection,
+ user,
+ offset,
+ duration,
+ df_collection_data_type,
+ user_factory,
+ product,
+ client_no_amm,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ from generalresearch.models.thl.user import User
+
+ if df_collection.data_type in unsupported_mock_types:
+ return
+ u1: User = user_factory(product=product)
+
+ delete_df_collection(coll=df_collection)
+
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ incite_item_factory(user=u1, item=item)
+
+ # Load up the data that we'll be using for various to_archive
+ # methods.
+ df = item.from_mysql()
+ ddf = dd.from_pandas(df, npartitions=1)
+
+ # (1) Write the basic archive, the issue is that because it's
+ # an empty pd.DataFrame, it never makes an actual parquet file
+ assert item.to_archive(ddf=ddf, is_partial=False, overwrite=False)
+ assert item.has_archive()
+ assert item.has_archive(include_empty=False)
+
+ def test__to_archive(
+ self,
+ df_collection_data_type,
+ df_collection,
+ user_factory,
+ product,
+ offset,
+ duration,
+ client_no_amm,
+ user,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ """We already have a test for the "non-private" version of this,
+ which primarily just uses the respective Client to determine if
+ the ddf is_empty or not.
+
+ Therefore, use the private test to check the manual behavior of
+ passing in the is_empty or overwrite.
+ """
+ if df_collection.data_type in unsupported_mock_types:
+ return
+
+ delete_df_collection(coll=df_collection)
+ u1: User = user_factory(product=product)
+
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ incite_item_factory(user=u1, item=item)
+
+ # Load up the data that we'll be using for various to_archive
+ # methods. Will always be empty pd.DataFrames for now...
+ df = item.from_mysql()
+ ddf = dd.from_pandas(df, npartitions=1)
+
+ # (1) Confirm a missing ddf (shouldn't bc of type hint) should
+ # immediately return back False
+ assert not item._to_archive(ddf=None, is_empty=True)
+ assert not item._to_archive(ddf=None, is_empty=False)
+
+ # (2) Setting empty overrides any possible state of the ddf
+ for rand_val in [df, ddf, True, 1_000]:
+ assert not item.empty_path.exists()
+ item._to_archive(ddf=rand_val, is_empty=True)
+ assert item.empty_path.exists()
+ item.empty_path.unlink()
+
+ # (3) Trigger a warning with overwrite. First write an empty,
+ # then write it again with override default to confirm it worked,
+ # then write it again with override=False to confirm it does
+ # not work.
+ assert item._to_archive(ddf=ddf, is_empty=True)
+ res1 = item.empty_path.stat()
+
+ # Returns none because it knows the file (regular, empty, or
+ # partial) already exists
+ assert not item._to_archive(ddf=ddf, is_empty=True, overwrite=False)
+
+ # Currently override=True doesn't actually work on empty files
+ # because it's checked again in .set_empty() and isn't
+ # aware of the override flag that may be passed in to
+ # item._to_archive()
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ item._to_archive(ddf=rand_val, is_empty=True, overwrite=True)
+ assert "set_empty is already set; why are you doing this?" in str(cm.value)
+
+ # We can assert the file stats are the same because we were never
+ # able to go ahead and rewrite or update it in anyway
+ res2 = item.empty_path.stat()
+ assert res1 == res2
+
+ @pytest.mark.skip
+ def test_to_archive_numbered_partial(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_initial_load(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_clear_corrupt_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset, duration",
+ argvalues=list(iter_product(df_collections, ["12h", "10D"], [timedelta(days=15)])),
+)
+class TestDFCollectionItemMethodBase:
+
+ @pytest.mark.skip
+ def test_path_exists(self, df_collection_data_type, offset, duration):
+ pass
+
+ @pytest.mark.skip
+ def test_next_numbered_path(self, df_collection_data_type, offset, duration):
+ pass
+
+ @pytest.mark.skip
+ def test_search_highest_numbered_path(
+ self, df_collection_data_type, offset, duration
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_tmp_filename(self, df_collection_data_type, offset, duration):
+ pass
+
+ @pytest.mark.skip
+ def test_tmp_path(self, df_collection_data_type, offset, duration):
+ pass
+
+ def test_is_empty(self, df_collection_data_type, df_collection, offset, duration):
+ """
+ test_has_empty was merged into this because item.has_empty is
+ an alias for is_empty.. or vis-versa
+ """
+
+ for item in df_collection.items:
+ assert not item.is_empty()
+ assert not item.has_empty()
+
+ item.empty_path.touch()
+
+ assert item.is_empty()
+ assert item.has_empty()
+
+ def test_has_partial_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ for item in df_collection.items:
+ assert not item.has_partial_archive()
+ item.partial_path.touch()
+ assert item.has_partial_archive()
+
+ def test_has_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ for item in df_collection.items:
+ # (1) Originally, nothing exists... so let's just make a file and
+ # confirm that it works if just touch that path (no validation
+ # occurs at all).
+ assert not item.has_archive(include_empty=False)
+ assert not item.has_archive(include_empty=True)
+ item.path.touch()
+ assert item.has_archive(include_empty=False)
+ assert item.has_archive(include_empty=True)
+
+ item.path.unlink()
+ assert not item.has_archive(include_empty=False)
+ assert not item.has_archive(include_empty=True)
+
+ # (2) Same as the above, except make an empty directory
+ # instead of a file
+ assert not item.has_archive(include_empty=False)
+ assert not item.has_archive(include_empty=True)
+ item.path.mkdir()
+ assert item.has_archive(include_empty=False)
+ assert item.has_archive(include_empty=True)
+
+ item.path.rmdir()
+ assert not item.has_archive(include_empty=False)
+ assert not item.has_archive(include_empty=True)
+
+ # (3) Rather than make a empty file or dir at the path, let's
+ # touch the empty_path and confirm the include_empty option
+ # works
+
+ item.empty_path.touch()
+ assert not item.has_archive(include_empty=False)
+ assert item.has_archive(include_empty=True)
+
+ def test_delete_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ for item in df_collection.items:
+ item: DFCollectionItem
+ # (1) Confirm that it doesn't raise an error or anything if we
+ # try to delete files or folders that do not exist
+ CollectionItemBase.delete_archive(generic_path=item.path)
+ CollectionItemBase.delete_archive(generic_path=item.empty_path)
+ CollectionItemBase.delete_archive(generic_path=item.partial_path)
+
+ item.path.touch()
+ item.empty_path.touch()
+ item.partial_path.touch()
+
+ CollectionItemBase.delete_archive(generic_path=item.path)
+ CollectionItemBase.delete_archive(generic_path=item.empty_path)
+ CollectionItemBase.delete_archive(generic_path=item.partial_path)
+
+ assert not item.path.exists()
+ assert not item.empty_path.exists()
+ assert not item.partial_path.exists()
+
+ def test_should_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ schema: DataFrameSchema = df_collection._schema
+ aa = schema.metadata[ARCHIVE_AFTER]
+
+ # It shouldn't be None, it can be timedelta(seconds=0)
+ assert isinstance(aa, timedelta)
+
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ if datetime.now(tz=timezone.utc) > item.finish + aa:
+ assert item.should_archive()
+ else:
+ assert not item.should_archive()
+
+ @pytest.mark.skip
+ def test_set_empty(self, df_collection_data_type, df_collection, offset, duration):
+ pass
+
+ def test_valid_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ # Originally, nothing has been saved or anything.. so confirm it
+ # always comes back as None
+ for item in df_collection.items:
+ assert not item.valid_archive(generic_path=None, sample=None)
+
+ _path = Path(pjoin(df_collection.archive_path, uuid4().hex))
+
+ # (1) Fail if isfile, but doesn't exist and if we can't read
+ # it as valid ParquetFile
+ assert not item.valid_archive(generic_path=_path, sample=None)
+ _path.touch()
+ assert not item.valid_archive(generic_path=_path, sample=None)
+ _path.unlink()
+
+ # (2) Fail if isdir and we can't read it as a valid ParquetFile
+ _path.mkdir()
+ assert _path.is_dir()
+ assert not item.valid_archive(generic_path=_path, sample=None)
+ _path.rmdir()
+
+ @pytest.mark.skip
+ def test_validate_df(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_from_archive(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+ def test__to_dict(self, df_collection_data_type, df_collection, offset, duration):
+
+ for item in df_collection.items:
+ res = item._to_dict()
+ assert isinstance(res, dict)
+ assert len(res.keys()) == 6
+
+ assert isinstance(res["should_archive"], bool)
+ assert isinstance(res["has_archive"], bool)
+ assert isinstance(res["path"], Path)
+ assert isinstance(res["filename"], str)
+
+ assert isinstance(res["start"], datetime)
+ assert isinstance(res["finish"], datetime)
+ assert res["start"] < res["finish"]
+
+ @pytest.mark.skip
+ def test_delete_partial(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_cleanup_partials(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_delete_dangling_partials(
+ self, df_collection_data_type, df_collection, offset, duration
+ ):
+ pass
+
+
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
+async def test_client(client, s, worker):
+ """c,s,a are all required - the secondary Worker (b) is not required"""
+
+ assert isinstance(client, Client)
+ assert isinstance(s, Scheduler)
+ assert isinstance(worker, Worker)
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset",
+ argvalues=combo_object(),
+)
+@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)])
+@pytest.mark.anyio
+async def test_client_parametrize(c, s, w, df_collection_data_type, offset):
+ """c,s,a are all required - the secondary Worker (b) is not required"""
+
+ assert isinstance(c, Client), f"c is not Client, it's {type(c)}"
+ assert isinstance(s, Scheduler), f"s is not Scheduler, it's {type(s)}"
+ assert isinstance(w, Worker), f"w is not Worker, it's {type(w)}"
+
+ assert df_collection_data_type is not None
+ assert isinstance(offset, str)
+
+
+# I cannot figure out how to define the parametrize on the Test, but then have
+# sync or async methods within it, with some having or not having the
+# gen_cluster decorator set.
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset, duration",
+ argvalues=list(iter_product(df_collections, ["12h", "10D"], [timedelta(days=15)])),
+)
+class TestDFCollectionItemFunctionalTest:
+
+ def test_to_archive_and_ddf(
+ self,
+ df_collection_data_type,
+ offset,
+ duration,
+ client_no_amm,
+ df_collection,
+ user,
+ user_factory,
+ product,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ from generalresearch.models.thl.user import User
+
+ if df_collection.data_type in unsupported_mock_types:
+ return
+ u1: User = user_factory(product=product)
+
+ delete_df_collection(coll=df_collection)
+ df_collection._client = client_no_amm
+
+ # Assert that there are no pre-existing archives
+ assert df_collection.progress.has_archive.eq(False).all()
+ res = df_collection.ddf()
+ assert res is None
+
+ delete_df_collection(coll=df_collection)
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ incite_item_factory(user=u1, item=item)
+ item.initial_load()
+
+ # I know it seems weird to delete items from the database before we
+ # proceed with the test. However, the content should have already
+ # been saved out into an parquet at this point, and I am too lazy
+ # to write a separate teardown for a collection (and not a
+ # single Item)
+
+ # Now that we went ahead with the initial_load, Assert that all
+ # items have archives files saved
+ assert isinstance(df_collection.progress, pd.DataFrame)
+ assert df_collection.progress.has_archive.eq(True).all()
+
+ ddf = df_collection.ddf()
+ shape = df_collection._client.compute(collections=ddf.shape, sync=True)
+ assert shape[0] > 5
+
+ def test_filesize_estimate(
+ self,
+ df_collection,
+ user,
+ offset,
+ duration,
+ client_no_amm,
+ user_factory,
+ product,
+ df_collection_data_type,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ """A functional test to write some Parquet files for the
+ DFCollection and then confirm that the files get written
+ correctly.
+
+ Confirm the files are written correctly by:
+ (1) Validating their passing the pandera schema
+ (2) The file or dir has an expected size on disk
+ """
+ import pyarrow.parquet as pq
+ from generalresearch.models.thl.user import User
+ import os
+
+ if df_collection.data_type in unsupported_mock_types:
+ return
+ delete_df_collection(coll=df_collection)
+ u1: User = user_factory(product=product)
+
+ # Pick 3 random items to sample for correct filesize
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ incite_item_factory(user=u1, item=item)
+ item.initial_load(overwrite=True)
+
+ total_bytes = 0
+ for fp in pq.ParquetDataset(item.path).files:
+ total_bytes += os.stat(fp).st_size
+
+ total_mb = total_bytes / 1_048_576
+
+ assert total_bytes > 1_000
+ assert total_mb < 1
+
+ def test_to_archive_client(
+ self,
+ client_no_amm,
+ df_collection,
+ user_factory,
+ product,
+ offset,
+ duration,
+ df_collection_data_type,
+ incite_item_factory,
+ delete_df_collection,
+ mnt_filepath,
+ ):
+ from generalresearch.models.thl.user import User
+
+ delete_df_collection(coll=df_collection)
+ df_collection._client = client_no_amm
+ u1: User = user_factory(product=product)
+
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ if df_collection.data_type in unsupported_mock_types:
+ continue
+
+ incite_item_factory(user=u1, item=item)
+
+ # Load up the data that we'll be using for various to_archive
+ # methods. Will always be empty pd.DataFrames for now...
+ df = item.from_mysql()
+ ddf = dd.from_pandas(df, npartitions=1)
+ assert isinstance(ddf, dd.DataFrame)
+
+ # (1) Write the basic archive, the issue is that because it's
+ # an empty pd.DataFrame, it never makes an actual parquet file
+ assert not item.has_archive()
+ saved = item.to_archive(ddf=ddf, is_partial=False, overwrite=False)
+ assert saved
+ assert item.has_archive(include_empty=True)
+
+ @pytest.mark.skip
+ def test_get_items(self, df_collection, product, offset, duration):
+ with pytest.warns(expected_warning=ResourceWarning) as cm:
+ df_collection.get_items_last365()
+ assert "DFCollectionItem has missing archives" in str(
+ [w.message for w in cm.list]
+ )
+
+ res = df_collection.get_items_last365()
+ assert len(res) == len(df_collection.items)
+
+ def test_saving_protections(
+ self,
+ client_no_amm,
+ df_collection_data_type,
+ df_collection,
+ incite_item_factory,
+ delete_df_collection,
+ user_factory,
+ product,
+ offset,
+ duration,
+ mnt_filepath,
+ ):
+ """Don't allow creating an archive for data that will likely be
+ overwritten or updated
+ """
+ from generalresearch.models.thl.user import User
+
+ if df_collection.data_type in unsupported_mock_types:
+ return
+ u1: User = user_factory(product=product)
+
+ schema: DataFrameSchema = df_collection._schema
+ aa = schema.metadata[ARCHIVE_AFTER]
+ assert isinstance(aa, timedelta)
+
+ delete_df_collection(df_collection)
+ for item in df_collection.items:
+ item: DFCollectionItem
+
+ incite_item_factory(user=u1, item=item)
+
+ should_archive = item.should_archive()
+ res = item.initial_load()
+
+ # self.assertIn("Cannot create archive for such new data", str(cm.records))
+
+ # .to_archive() will return back True or False depending on if it
+ # was successful. We want to compare that result to the
+ # .should_archive() method result
+ assert should_archive == res
+
+ def test_empty_item(
+ self,
+ client_no_amm,
+ df_collection_data_type,
+ df_collection,
+ incite_item_factory,
+ delete_df_collection,
+ user,
+ offset,
+ duration,
+ mnt_filepath,
+ ):
+ delete_df_collection(coll=df_collection)
+
+ for item in df_collection.items:
+ assert not item.has_empty()
+ df: pd.DataFrame = item.from_mysql()
+
+ # We do this check b/c the Ledger returns back None and
+ # I don't want it to fail when we go to make a ddf
+ if df is None:
+ item.set_empty()
+ else:
+ ddf = dd.from_pandas(df, npartitions=1)
+ item.to_archive(ddf=ddf)
+
+ assert item.has_empty()
+
+ def test_file_touching(
+ self,
+ client_no_amm,
+ df_collection_data_type,
+ df_collection,
+ incite_item_factory,
+ delete_df_collection,
+ user_factory,
+ product,
+ offset,
+ duration,
+ mnt_filepath,
+ ):
+ from generalresearch.models.thl.user import User
+
+ delete_df_collection(coll=df_collection)
+ df_collection._client = client_no_amm
+ u1: User = user_factory(product=product)
+
+ for item in df_collection.items:
+ # Confirm none of the paths exist yet
+ assert not item.has_archive()
+ assert not item.path_exists(generic_path=item.path)
+ assert not item.has_empty()
+ assert not item.path_exists(generic_path=item.empty_path)
+
+ if df_collection.data_type in unsupported_mock_types:
+ assert not item.has_archive(include_empty=False)
+ assert not item.has_empty()
+ assert not item.path_exists(generic_path=item.empty_path)
+ else:
+ incite_item_factory(user=u1, item=item)
+ item.initial_load()
+
+ assert item.has_archive(include_empty=False)
+ assert item.path_exists(generic_path=item.path)
+ assert not item.has_empty()
diff --git a/tests/incite/collections/test_df_collection_thl_marketplaces.py b/tests/incite/collections/test_df_collection_thl_marketplaces.py
new file mode 100644
index 0000000..0a77938
--- /dev/null
+++ b/tests/incite/collections/test_df_collection_thl_marketplaces.py
@@ -0,0 +1,75 @@
+from datetime import datetime, timezone
+from itertools import product
+
+import pytest
+from pandera import Column, Index, DataFrameSchema
+
+from generalresearch.incite.collections import DFCollection
+from generalresearch.incite.collections import DFCollectionType
+from generalresearch.incite.collections.thl_marketplaces import (
+ InnovateSurveyHistoryCollection,
+ MorningSurveyTimeseriesCollection,
+ SagoSurveyHistoryCollection,
+ SpectrumSurveyTimeseriesCollection,
+)
+from test_utils.incite.conftest import mnt_filepath
+
+
+def combo_object():
+ for x in product(
+ [
+ InnovateSurveyHistoryCollection,
+ MorningSurveyTimeseriesCollection,
+ SagoSurveyHistoryCollection,
+ SpectrumSurveyTimeseriesCollection,
+ ],
+ ["5min", "6H", "30D"],
+ ):
+ yield x
+
+
+@pytest.mark.parametrize("df_coll, offset", combo_object())
+class TestDFCollection_thl_marketplaces:
+
+ def test_init(self, mnt_filepath, df_coll, offset, spectrum_rw):
+ assert issubclass(df_coll, DFCollection)
+
+ # This is stupid, but we need to pull the default from the
+ # Pydantic field
+ data_type = df_coll.model_fields["data_type"].default
+ assert isinstance(data_type, DFCollectionType)
+
+ # (1) Can't be totally empty, needs a path...
+ with pytest.raises(expected_exception=Exception) as cm:
+ instance = df_coll()
+
+ # (2) Confirm it only needs the archive_path
+ instance = df_coll(
+ archive_path=mnt_filepath.archive_path(enum_type=data_type),
+ )
+ assert isinstance(instance, DFCollection)
+
+ # (3) Confirm it loads with all
+ instance = df_coll(
+ archive_path=mnt_filepath.archive_path(enum_type=data_type),
+ sql_helper=spectrum_rw,
+ offset=offset,
+ start=datetime(year=2023, month=6, day=1, minute=0, tzinfo=timezone.utc),
+ finished=datetime(year=2023, month=6, day=1, minute=5, tzinfo=timezone.utc),
+ )
+ assert isinstance(instance, DFCollection)
+
+ # (4) Now that we initialize the Class, we can access the _schema
+ assert isinstance(instance._schema, DataFrameSchema)
+ assert isinstance(instance._schema.index, Index)
+
+ for c in instance._schema.columns.keys():
+ assert isinstance(c, str)
+ col = instance._schema.columns[c]
+ assert isinstance(col, Column)
+
+ assert instance._schema.coerce, "coerce on all Schemas"
+ assert isinstance(instance._schema.checks, list)
+ assert len(instance._schema.checks) == 0
+ assert isinstance(instance._schema.metadata, dict)
+ assert len(instance._schema.metadata.keys()) == 2
diff --git a/tests/incite/collections/test_df_collection_thl_web.py b/tests/incite/collections/test_df_collection_thl_web.py
new file mode 100644
index 0000000..e6f464b
--- /dev/null
+++ b/tests/incite/collections/test_df_collection_thl_web.py
@@ -0,0 +1,160 @@
+from datetime import datetime
+from itertools import product
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+from pandera import DataFrameSchema
+
+from generalresearch.incite.collections import DFCollection, DFCollectionType
+
+
+def combo_object():
+ for x in product(
+ [
+ DFCollectionType.USER,
+ DFCollectionType.WALL,
+ DFCollectionType.SESSION,
+ DFCollectionType.TASK_ADJUSTMENT,
+ DFCollectionType.AUDIT_LOG,
+ DFCollectionType.LEDGER,
+ ],
+ ["30min", "1H"],
+ ):
+ yield x
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollection_thl_web:
+
+ def test_init(self, df_collection_data_type, offset, df_collection):
+ assert isinstance(df_collection_data_type, DFCollectionType)
+ assert isinstance(df_collection, DFCollection)
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollection_thl_web_Properties:
+
+ def test_items(self, df_collection_data_type, offset, df_collection):
+ assert isinstance(df_collection.items, list)
+ for i in df_collection.items:
+ assert i._collection == df_collection
+
+ def test__schema(self, df_collection_data_type, offset, df_collection):
+ assert isinstance(df_collection._schema, DataFrameSchema)
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollection_thl_web_BaseProperties:
+
+ @pytest.mark.skip
+ def test__interval_range(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ def test_interval_start(self, df_collection_data_type, offset, df_collection):
+ assert isinstance(df_collection.interval_start, datetime)
+
+ def test_interval_range(self, df_collection_data_type, offset, df_collection):
+ assert isinstance(df_collection.interval_range, list)
+
+ def test_progress(self, df_collection_data_type, offset, df_collection):
+ assert isinstance(df_collection.progress, pd.DataFrame)
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollection_thl_web_Methods:
+
+ @pytest.mark.skip
+ def test_initial_loads(self, df_collection_data_type, df_collection, offset):
+ pass
+
+ @pytest.mark.skip
+ def test_fetch_force_rr_latest(
+ self, df_collection_data_type, df_collection, offset
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_force_rr_latest(self, df_collection_data_type, df_collection, offset):
+ pass
+
+
+@pytest.mark.parametrize(
+ argnames="df_collection_data_type, offset", argvalues=combo_object()
+)
+class TestDFCollection_thl_web_BaseMethods:
+
+ def test_fetch_all_paths(self, df_collection_data_type, offset, df_collection):
+ res = df_collection.fetch_all_paths(
+ items=None, force_rr_latest=False, include_partial=False
+ )
+ assert isinstance(res, list)
+
+ @pytest.mark.skip
+ def test_ddf(self, df_collection_data_type, offset, df_collection):
+ res = df_collection.ddf()
+ assert isinstance(res, dd.DataFrame)
+
+ # -- cleanup --
+ @pytest.mark.skip
+ def test_schedule_cleanup(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_cleanup(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_cleanup_partials(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_clear_tmp_archives(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_clear_corrupt_archives(
+ self, df_collection_data_type, offset, df_collection
+ ):
+ pass
+
+ @pytest.mark.skip
+ def test_rebuild_symlinks(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ # -- Source timing --
+ @pytest.mark.skip
+ def test_get_item(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_get_item_start(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_get_items(self, df_collection_data_type, offset, df_collection):
+ # If we get all the items from the start of the collection, it
+ # should include all the items!
+ res1 = df_collection.items
+ res2 = df_collection.get_items(since=df_collection.start)
+ assert len(res1) == len(res2)
+
+ @pytest.mark.skip
+ def test_get_items_from_year(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_get_items_last90(self, df_collection_data_type, offset, df_collection):
+ pass
+
+ @pytest.mark.skip
+ def test_get_items_last365(self, df_collection_data_type, offset, df_collection):
+ pass
diff --git a/tests/incite/collections/test_df_collection_thl_web_ledger.py b/tests/incite/collections/test_df_collection_thl_web_ledger.py
new file mode 100644
index 0000000..599d979
--- /dev/null
+++ b/tests/incite/collections/test_df_collection_thl_web_ledger.py
@@ -0,0 +1,32 @@
+# def test_loaded(self, client_no_amm, collection, new_user_fixture, pop_ledger_merge):
+# collection._client = client_no_amm
+#
+# teardown_events(collection)
+# THL_LM.create_main_accounts()
+#
+# for item in collection.items:
+# populate_events(item, user=new_user_fixture)
+# item.initial_load()
+#
+# ddf = collection.ddf(
+# force_rr_latest=False,
+# include_partial=True,
+# filters=[
+# ("created", ">=", collection.start),
+# ("created", "<", collection.finished),
+# ],
+# )
+#
+# assert isinstance(ddf, dd.DataFrame)
+# df = client_no_amm.compute(collections=ddf, sync=True)
+# assert isinstance(df, pd.DataFrame)
+#
+# # Simple validation check(s)
+# assert not df.tx_id.is_unique
+# df["net"] = df.direction * df.amount
+# assert df.groupby("tx_id").net.sum().sum() == 0
+#
+# teardown_events(collection)
+#
+#
+#
diff --git a/tests/incite/mergers/__init__.py b/tests/incite/mergers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/incite/mergers/__init__.py
diff --git a/tests/incite/mergers/foundations/__init__.py b/tests/incite/mergers/foundations/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/incite/mergers/foundations/__init__.py
diff --git a/tests/incite/mergers/foundations/test_enriched_session.py b/tests/incite/mergers/foundations/test_enriched_session.py
new file mode 100644
index 0000000..ec15d38
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_enriched_session.py
@@ -0,0 +1,138 @@
+from datetime import timedelta, timezone, datetime
+from decimal import Decimal
+from itertools import product
+from typing import Optional
+
+from generalresearch.incite.schemas.admin_responses import (
+ AdminPOPSessionSchema,
+)
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+
+from test_utils.incite.collections.conftest import (
+ session_collection,
+ wall_collection,
+)
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(
+ product(
+ ["12h", "3D"],
+ [timedelta(days=5)],
+ )
+ ),
+)
+class TestEnrichedSession:
+
+ def test_base(
+ self,
+ client_no_amm,
+ product,
+ user_factory,
+ wall_collection,
+ session_collection,
+ enriched_session_merge,
+ thl_web_rr,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ delete_df_collection(coll=session_collection)
+
+ u1: User = user_factory(product=product, created=session_collection.start)
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=u1)
+ item.initial_load()
+
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_session_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ ddf = enriched_session_merge.ddf()
+ assert isinstance(ddf, dd.DataFrame)
+
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+
+ assert not df.empty
+
+ # -- Teardown
+ delete_df_collection(session_collection)
+
+
+class TestEnrichedSessionAdmin:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def offset(self) -> str:
+ return "1d"
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_to_admin_response(
+ self,
+ event_report_request,
+ enriched_session_merge,
+ client_no_amm,
+ wall_collection,
+ session_collection,
+ thl_web_rr,
+ session_report_request,
+ user_factory,
+ start,
+ session_factory,
+ product_factory,
+ delete_df_collection,
+ ):
+ delete_df_collection(coll=wall_collection)
+ delete_df_collection(coll=session_collection)
+
+ p1 = product_factory()
+ p2 = product_factory()
+
+ for p in [p1, p2]:
+ u = user_factory(product=p)
+ for i in range(50):
+ s = session_factory(
+ user=u,
+ wall_count=1,
+ wall_req_cpi=Decimal("1.00"),
+ started=start + timedelta(minutes=i, seconds=1),
+ )
+ wall_collection.initial_load(client=None, sync=True)
+ session_collection.initial_load(client=None, sync=True)
+
+ enriched_session_merge.build(
+ client=client_no_amm,
+ session_coll=session_collection,
+ wall_coll=wall_collection,
+ pg_config=thl_web_rr,
+ )
+
+ df = enriched_session_merge.to_admin_response(
+ rr=session_report_request, client=client_no_amm
+ )
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+ assert isinstance(AdminPOPSessionSchema.validate(df), pd.DataFrame)
+ assert df.index.get_level_values(1).nunique() == 2
diff --git a/tests/incite/mergers/foundations/test_enriched_task_adjust.py b/tests/incite/mergers/foundations/test_enriched_task_adjust.py
new file mode 100644
index 0000000..96c214f
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_enriched_task_adjust.py
@@ -0,0 +1,76 @@
+from datetime import timedelta
+from itertools import product as iter_product
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+
+from test_utils.incite.collections.conftest import (
+ wall_collection,
+ task_adj_collection,
+ session_collection,
+)
+from test_utils.incite.mergers.conftest import enriched_wall_merge
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration,",
+ argvalues=list(
+ iter_product(
+ ["12h", "3D"],
+ [timedelta(days=5)],
+ )
+ ),
+)
+class TestEnrichedTaskAdjust:
+
+ @pytest.mark.skip
+ def test_base(
+ self,
+ client_no_amm,
+ user_factory,
+ product,
+ task_adj_collection,
+ wall_collection,
+ session_collection,
+ enriched_wall_merge,
+ enriched_task_adjust_merge,
+ incite_item_factory,
+ delete_df_collection,
+ thl_web_rr,
+ ):
+ from generalresearch.models.thl.user import User
+
+ # -- Build & Setup
+ delete_df_collection(coll=session_collection)
+ u1: User = user_factory(product=product)
+
+ for item in session_collection.items:
+ incite_item_factory(user=u1, item=item)
+ item.initial_load()
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ session_coll=session_collection,
+ wall_coll=wall_collection,
+ pg_config=thl_web_rr,
+ )
+
+ enriched_task_adjust_merge.build(
+ client=client_no_amm,
+ task_adjust_coll=task_adj_collection,
+ enriched_wall=enriched_wall_merge,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ ddf = enriched_task_adjust_merge.ddf()
+ assert isinstance(ddf, dd.DataFrame)
+
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+
+ assert not df.empty
diff --git a/tests/incite/mergers/foundations/test_enriched_wall.py b/tests/incite/mergers/foundations/test_enriched_wall.py
new file mode 100644
index 0000000..8f4995b
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_enriched_wall.py
@@ -0,0 +1,236 @@
+from datetime import timedelta, timezone, datetime
+from decimal import Decimal
+from itertools import product as iter_product
+from typing import Optional
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+
+from generalresearch.incite.mergers.foundations.enriched_wall import (
+ EnrichedWallMergeItem,
+)
+from test_utils.incite.collections.conftest import (
+ session_collection,
+ wall_collection,
+)
+from test_utils.incite.conftest import incite_item_factory
+from test_utils.incite.mergers.conftest import (
+ enriched_wall_merge,
+)
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(iter_product(["48h", "3D"], [timedelta(days=5)])),
+)
+class TestEnrichedWall:
+
+ def test_base(
+ self,
+ client_no_amm,
+ product,
+ user_factory,
+ wall_collection,
+ thl_web_rr,
+ session_collection,
+ enriched_wall_merge,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ # -- Build & Setup
+ delete_df_collection(coll=session_collection)
+ delete_df_collection(coll=wall_collection)
+ u1: User = user_factory(product=product, created=session_collection.start)
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=u1)
+ item.initial_load()
+
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ ddf = enriched_wall_merge.ddf()
+ assert isinstance(ddf, dd.DataFrame)
+
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+
+ assert not df.empty
+
+ def test_base_item(
+ self,
+ client_no_amm,
+ product,
+ user_factory,
+ wall_collection,
+ session_collection,
+ enriched_wall_merge,
+ delete_df_collection,
+ thl_web_rr,
+ incite_item_factory,
+ ):
+ # -- Build & Setup
+ delete_df_collection(coll=session_collection)
+ u = user_factory(product=product, created=session_collection.start)
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load()
+ for item in wall_collection.items:
+ item.initial_load()
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ # --
+
+ for item in enriched_wall_merge.items:
+ assert isinstance(item, EnrichedWallMergeItem)
+
+ path = item.path
+
+ try:
+ modified_time1 = path.stat().st_mtime
+ except (Exception,):
+ modified_time1 = 0
+
+ item.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+ modified_time2 = path.stat().st_mtime
+
+ # Merger Items can't be updated unless it's a partial, confirm
+ # that even after attempting to rebuild, it doesn't re-touch
+ # the file
+ assert modified_time2 == modified_time1
+
+ # def test_admin_pop_session_device_type(ew_merge_setup):
+ # self.build()
+ #
+ # rr = ReportRequest(
+ # report_type=ReportType.POP_EVENT,
+ # index0="started",
+ # index1="device_type",
+ # freq="min",
+ # start=start,
+ # )
+ #
+ # df, categories, updated = self.instance.to_admin_response(
+ # rr=rr, product_ids=[self.product.id], client=client
+ # )
+ #
+ # assert isinstance(df, pd.DataFrame)
+ # device_types_str = [str(e.value) for e in DeviceType]
+ # device_types = df.index.get_level_values(1).values
+ # assert all([dt in device_types_str for dt in device_types])
+
+
+class TestEnrichedWallToAdmin:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def offset(self) -> str:
+ return "1d"
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_empty(self, enriched_wall_merge, client_no_amm, start):
+ from generalresearch.models.admin.request import ReportRequest
+
+ rr = ReportRequest.model_validate({"interval": "5min", "start": start})
+
+ res = enriched_wall_merge.to_admin_response(
+ rr=rr,
+ client=client_no_amm,
+ )
+
+ assert isinstance(res, pd.DataFrame)
+
+ assert res.empty
+ assert len(res.columns) > 5
+
+ def test_to_admin_response(
+ self,
+ event_report_request,
+ enriched_wall_merge,
+ client_no_amm,
+ wall_collection,
+ session_collection,
+ thl_web_rr,
+ user,
+ session_factory,
+ delete_df_collection,
+ product_factory,
+ user_factory,
+ start,
+ ):
+ delete_df_collection(coll=wall_collection)
+ delete_df_collection(coll=session_collection)
+
+ p1 = product_factory()
+ p2 = product_factory()
+
+ for p in [p1, p2]:
+ u = user_factory(product=p)
+ for i in range(50):
+ s = session_factory(
+ user=u,
+ wall_count=2,
+ wall_req_cpi=Decimal("1.00"),
+ started=start + timedelta(minutes=i, seconds=1),
+ )
+
+ wall_collection.initial_load(client=None, sync=True)
+ session_collection.initial_load(client=None, sync=True)
+
+ enriched_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ session_coll=session_collection,
+ pg_config=thl_web_rr,
+ )
+
+ df = enriched_wall_merge.to_admin_response(
+ rr=event_report_request, client=client_no_amm
+ )
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+ # assert len(df) == 1
+ # assert user.product_id == df.reset_index().loc[0, "index1"]
+ assert df.index.get_level_values(1).nunique() == 2
diff --git a/tests/incite/mergers/foundations/test_user_id_product.py b/tests/incite/mergers/foundations/test_user_id_product.py
new file mode 100644
index 0000000..f96bfb4
--- /dev/null
+++ b/tests/incite/mergers/foundations/test_user_id_product.py
@@ -0,0 +1,73 @@
+from datetime import timedelta, datetime, timezone
+from itertools import product
+
+import pandas as pd
+import pytest
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+
+from generalresearch.incite.mergers.foundations.user_id_product import (
+ UserIdProductMergeItem,
+)
+from test_utils.incite.mergers.conftest import user_id_product_merge
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration, start",
+ argvalues=list(
+ product(
+ ["12h", "3D"],
+ [timedelta(days=5)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestUserIDProduct:
+
+ @pytest.mark.skip
+ def test_base(self, client_no_amm, user_id_product_merge):
+ ddf = user_id_product_merge.ddf()
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ @pytest.mark.skip
+ def test_base_item(self, client_no_amm, user_id_product_merge, user_collection):
+ assert len(user_id_product_merge.items) == 1
+
+ for item in user_id_product_merge.items:
+ assert isinstance(item, UserIdProductMergeItem)
+
+ path = item.path
+
+ try:
+ modified_time1 = path.stat().st_mtime
+ except (Exception,):
+ modified_time1 = 0
+
+ user_id_product_merge.build(client=client_no_amm, user_coll=user_collection)
+ modified_time2 = path.stat().st_mtime
+
+ assert modified_time2 > modified_time1
+
+ @pytest.mark.skip
+ def test_read(self, client_no_amm, user_id_product_merge):
+ users_ddf = user_id_product_merge.ddf()
+ df = client_no_amm.compute(collections=users_ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert len(df.columns) == 1
+ assert str(df.product_id.dtype) == "category"
diff --git a/tests/incite/mergers/test_merge_collection.py b/tests/incite/mergers/test_merge_collection.py
new file mode 100644
index 0000000..692cac3
--- /dev/null
+++ b/tests/incite/mergers/test_merge_collection.py
@@ -0,0 +1,102 @@
+from datetime import datetime, timezone, timedelta
+from itertools import product
+
+import pandas as pd
+import pytest
+from pandera import DataFrameSchema
+
+from generalresearch.incite.mergers import (
+ MergeCollection,
+ MergeType,
+)
+from test_utils.incite.conftest import mnt_filepath
+
+merge_types = list(e for e in MergeType if e != MergeType.TEST)
+
+
+@pytest.mark.parametrize(
+ argnames="merge_type, offset, duration, start",
+ argvalues=list(
+ product(
+ merge_types,
+ ["5min", "6h", "14D"],
+ [timedelta(days=30)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestMergeCollection:
+
+ def test_init(self, mnt_filepath, merge_type, offset, duration, start):
+ with pytest.raises(expected_exception=ValueError) as cm:
+ MergeCollection(archive_path=mnt_filepath.data_src)
+ assert "Must explicitly provide a merge_type" in str(cm.value)
+
+ instance = MergeCollection(
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+ assert instance.merge_type == merge_type
+
+ def test_items(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ offset=offset,
+ start=start,
+ finished=start + duration,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert len(instance.interval_range) == len(instance.items)
+
+ def test_progress(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ offset=offset,
+ start=start,
+ finished=start + duration,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert isinstance(instance.progress, pd.DataFrame)
+ assert instance.progress.shape[0] > 0
+ assert instance.progress.shape[1] == 7
+ assert instance.progress["group_by"].isnull().all()
+
+ def test_schema(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert isinstance(instance._schema, DataFrameSchema)
+
+ def test_load(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ # Confirm that there are no archives available yet
+ assert instance.progress.has_archive.eq(False).all()
+
+ def test_get_items(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ # with pytest.raises(expected_exception=ResourceWarning) as cm:
+ res = instance.get_items_last365()
+ # assert "has missing archives", str(cm.value)
+ assert len(res) == len(instance.items)
diff --git a/tests/incite/mergers/test_merge_collection_item.py b/tests/incite/mergers/test_merge_collection_item.py
new file mode 100644
index 0000000..96f8789
--- /dev/null
+++ b/tests/incite/mergers/test_merge_collection_item.py
@@ -0,0 +1,66 @@
+from datetime import datetime, timezone, timedelta
+from itertools import product
+from pathlib import PurePath
+
+import pytest
+
+from generalresearch.incite.mergers import MergeCollectionItem, MergeType
+from generalresearch.incite.mergers.foundations.enriched_session import (
+ EnrichedSessionMerge,
+)
+from generalresearch.incite.mergers.foundations.enriched_wall import (
+ EnrichedWallMerge,
+)
+from test_utils.incite.mergers.conftest import merge_collection
+
+
+@pytest.mark.parametrize(
+ argnames="merge_type, offset, duration",
+ argvalues=list(
+ product(
+ [MergeType.ENRICHED_SESSION, MergeType.ENRICHED_WALL],
+ ["1h"],
+ [timedelta(days=1)],
+ )
+ ),
+)
+class TestMergeCollectionItem:
+
+ def test_file_naming(self, merge_collection, offset, duration, start):
+ assert len(merge_collection.items) == 25
+
+ items: list[MergeCollectionItem] = merge_collection.items
+
+ for i in items:
+ i: MergeCollectionItem
+
+ assert isinstance(i.path, PurePath)
+ assert i.path.name == i.filename
+
+ assert i._collection.merge_type.name.lower() in i.filename
+ assert i._collection.offset in i.filename
+ assert i.start.strftime("%Y-%m-%d-%H-%M-%S") in i.filename
+
+ def test_archives(self, merge_collection, offset, duration, start):
+ assert len(merge_collection.items) == 25
+
+ for i in merge_collection.items:
+ assert not i.has_archive()
+ assert not i.has_empty()
+ assert not i.is_empty()
+ assert not i.has_partial_archive()
+ assert i.has_archive() == i.path_exists(generic_path=i.path)
+
+ res = set([i.should_archive() for i in merge_collection.items])
+ assert len(res) == 1
+
+ def test_item_to_archive(self, merge_collection, offset, duration, start):
+ for item in merge_collection.items:
+ item: MergeCollectionItem
+ assert not item.has_archive()
+
+ # TODO: setup build methods
+ # ddf = self.build
+ # saved = instance.to_archive(ddf=ddf)
+ # self.assertTrue(saved)
+ # self.assertTrue(instance.has_archive())
diff --git a/tests/incite/mergers/test_pop_ledger.py b/tests/incite/mergers/test_pop_ledger.py
new file mode 100644
index 0000000..6f96108
--- /dev/null
+++ b/tests/incite/mergers/test_pop_ledger.py
@@ -0,0 +1,307 @@
+from datetime import timedelta, datetime, timezone
+from itertools import product as iter_product
+from typing import Optional
+
+import pandas as pd
+import pytest
+from distributed.utils_test import client_no_amm
+
+from generalresearch.incite.schemas.mergers.pop_ledger import (
+ numerical_col_names,
+)
+from test_utils.incite.collections.conftest import ledger_collection
+from test_utils.incite.conftest import mnt_filepath, incite_item_factory
+from test_utils.incite.mergers.conftest import pop_ledger_merge
+from test_utils.managers.ledger.conftest import create_main_accounts
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration",
+ argvalues=list(
+ iter_product(
+ ["12h", "3D"],
+ [timedelta(days=4)],
+ )
+ ),
+)
+class TestMergePOPLedger:
+
+ @pytest.fixture
+ def start(self) -> "datetime":
+ return datetime(year=2020, month=3, day=14, tzinfo=timezone.utc)
+
+ @pytest.fixture
+ def duration(self) -> Optional["timedelta"]:
+ return timedelta(days=5)
+
+ def test_base(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ product,
+ user_factory,
+ create_main_accounts,
+ thl_lm,
+ delete_df_collection,
+ incite_item_factory,
+ delete_ledger_db,
+ ):
+ from generalresearch.models.thl.ledger import LedgerAccount
+
+ u = user_factory(product=product, created=ledger_collection.start)
+
+ # -- Build & Setup
+ delete_ledger_db()
+ create_main_accounts()
+ delete_df_collection(coll=ledger_collection)
+ # assert ledger_collection.start is None
+ # assert ledger_collection.offset is None
+
+ for item in ledger_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load()
+
+ # Confirm any of the items are archived
+ assert ledger_collection.progress.has_archive.eq(True).all()
+
+ pop_ledger_merge.build(
+ client=client_no_amm,
+ ledger_coll=ledger_collection,
+ )
+ # assert pop_ledger_merge.progress.has_archive.eq(True).all()
+
+ ddf = pop_ledger_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+
+ user_wallet_account: LedgerAccount = thl_lm.get_account_or_create_user_wallet(
+ user=u
+ )
+ cash_account: LedgerAccount = thl_lm.get_account_cash()
+ rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue()
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ # Pure SQL based lookups
+ cash_balance: int = thl_lm.get_account_balance(account=cash_account)
+ rev_balance: int = thl_lm.get_account_balance(account=rev_account)
+ assert cash_balance > rev_balance
+
+ # (1) Test Cash Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names,
+ filters=[
+ ("account_id", "==", cash_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+ assert df["mp_payment.CREDIT"].sum() == 0
+ assert cash_balance > 0
+ assert df["mp_payment.DEBIT"].sum() == cash_balance
+
+ # (2) Test Revenue Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names,
+ filters=[
+ ("account_id", "==", rev_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert rev_balance == 0
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == 0
+ assert df["mp_payment.CREDIT"].sum() > 0
+
+ # -- Cleanup
+ delete_ledger_db()
+
+ def test_pydantic_init(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ mnt_filepath,
+ product,
+ user_factory,
+ create_main_accounts,
+ offset,
+ duration,
+ start,
+ thl_lm,
+ incite_item_factory,
+ delete_df_collection,
+ delete_ledger_db,
+ session_collection,
+ ):
+ from generalresearch.models.thl.ledger import LedgerAccount
+ from generalresearch.models.thl.product import Product
+ from generalresearch.models.thl.finance import ProductBalances
+
+ u = user_factory(product=product, created=session_collection.start)
+
+ assert ledger_collection.finished is not None
+ assert isinstance(u.product, Product)
+ delete_ledger_db()
+ create_main_accounts(),
+ delete_df_collection(coll=ledger_collection)
+
+ bp_account: LedgerAccount = thl_lm.get_account_or_create_bp_wallet(
+ product=u.product
+ )
+ cash_account: LedgerAccount = thl_lm.get_account_cash()
+ rev_account: LedgerAccount = thl_lm.get_account_task_complete_revenue()
+
+ for item in ledger_collection.items:
+ incite_item_factory(item=item, user=u)
+ item.initial_load(overwrite=True)
+
+ pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection)
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ # (1) Filter by the Product Account, this means no cash_account, or
+ # rev_account transactions will be present in here...
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", bp_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+ df = df.set_index("time_idx")
+ assert not df.empty
+
+ instance = ProductBalances.from_pandas(input_data=df.sum())
+ assert instance.payout == instance.net == instance.bp_payment_credit
+ assert instance.available_balance < instance.net
+ assert instance.available_balance + instance.retainer == instance.net
+ assert instance.balance == thl_lm.get_account_balance(bp_account)
+ assert df["bp_payment.CREDIT"].sum() == thl_lm.get_account_balance(bp_account)
+
+ # (2) Filter by the Cash Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", cash_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ cash_balance: int = thl_lm.get_account_balance(account=cash_account)
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert cash_balance > 0
+ assert df["mp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == cash_balance
+
+ # (2) Filter by the Revenue Account
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", rev_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ rev_balance: int = thl_lm.get_account_balance(account=rev_account)
+ assert rev_balance == 0
+ assert df["bp_payment.CREDIT"].sum() == 0
+ assert df["mp_payment.DEBIT"].sum() == 0
+ assert df["mp_payment.CREDIT"].sum() > 0
+
+ def test_resample(
+ self,
+ client_no_amm,
+ ledger_collection,
+ pop_ledger_merge,
+ mnt_filepath,
+ user_factory,
+ product,
+ create_main_accounts,
+ offset,
+ duration,
+ start,
+ thl_lm,
+ delete_df_collection,
+ incite_item_factory,
+ ):
+ from generalresearch.models.thl.user import User
+
+ assert ledger_collection.finished is not None
+ delete_df_collection(coll=ledger_collection)
+ u1: User = user_factory(product=product)
+
+ bp_account = thl_lm.get_account_or_create_bp_wallet(product=u1.product)
+
+ for item in ledger_collection.items:
+ incite_item_factory(user=u1, item=item)
+ item.initial_load(overwrite=True)
+
+ pop_ledger_merge.build(client=client_no_amm, ledger_coll=ledger_collection)
+
+ item_finishes = [i.finish for i in ledger_collection.items]
+ item_finishes.sort(reverse=True)
+ last_item_finish = item_finishes[0]
+
+ ddf = pop_ledger_merge.ddf(
+ columns=numerical_col_names + ["time_idx"],
+ filters=[
+ ("account_id", "==", bp_account.uuid),
+ ("time_idx", ">=", ledger_collection.start),
+ ("time_idx", "<", last_item_finish),
+ ],
+ )
+ df = client_no_amm.compute(collections=ddf, sync=True)
+ assert isinstance(df, pd.DataFrame)
+ assert isinstance(df.index, pd.Index)
+ assert not isinstance(df.index, pd.RangeIndex)
+
+ # Now change the index so we can easily resample it
+ df = df.set_index("time_idx")
+ assert isinstance(df.index, pd.Index)
+ assert isinstance(df.index, pd.DatetimeIndex)
+
+ bp_account_balance = thl_lm.get_account_balance(account=bp_account)
+
+ # Initial sum
+ initial_sum = df.sum().sum()
+ # assert len(df) == 48 # msg="Original df should be 48 rows"
+
+ # Original (1min) to 5min
+ df_5min = df.resample("5min").sum()
+ # assert len(df_5min) == 12
+ assert initial_sum == df_5min.sum().sum()
+
+ # 30min
+ df_30min = df.resample("30min").sum()
+ # assert len(df_30min) == 2
+ assert initial_sum == df_30min.sum().sum()
+
+ # 1hr
+ df_1hr = df.resample("1h").sum()
+ # assert len(df_1hr) == 1
+ assert initial_sum == df_1hr.sum().sum()
+
+ # 1 day
+ df_1day = df.resample("1d").sum()
+ # assert len(df_1day) == 1
+ assert initial_sum == df_1day.sum().sum()
diff --git a/tests/incite/mergers/test_ym_survey_merge.py b/tests/incite/mergers/test_ym_survey_merge.py
new file mode 100644
index 0000000..4c2df6b
--- /dev/null
+++ b/tests/incite/mergers/test_ym_survey_merge.py
@@ -0,0 +1,125 @@
+from datetime import timedelta, timezone, datetime
+from itertools import product
+
+import pandas as pd
+import pytest
+
+# noinspection PyUnresolvedReferences
+from distributed.utils_test import (
+ gen_cluster,
+ client_no_amm,
+ loop,
+ loop_in_thread,
+ cleanup,
+ cluster_fixture,
+ client,
+)
+
+from test_utils.incite.collections.conftest import wall_collection, session_collection
+from test_utils.incite.mergers.conftest import (
+ enriched_session_merge,
+ ym_survey_wall_merge,
+)
+
+
+@pytest.mark.parametrize(
+ argnames="offset, duration, start",
+ argvalues=list(
+ product(
+ ["12h", "3D"],
+ [timedelta(days=30)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestYMSurveyMerge:
+ """We override start, not because it's needed on the YMSurveyWall merge,
+ which operates on a rolling 10-day window, but because we don't want
+ to mock data in the wall collection and enriched_session_merge from
+ the 1800s and then wonder why there is no data available in the past
+ 10 days in the database.
+ """
+
+ def test_base(
+ self,
+ client_no_amm,
+ user_factory,
+ product,
+ ym_survey_wall_merge,
+ wall_collection,
+ session_collection,
+ enriched_session_merge,
+ delete_df_collection,
+ incite_item_factory,
+ thl_web_rr,
+ ):
+ from generalresearch.models.thl.user import User
+
+ delete_df_collection(coll=session_collection)
+ user: User = user_factory(product=product, created=session_collection.start)
+
+ # -- Build & Setup
+ assert ym_survey_wall_merge.start is None
+ assert ym_survey_wall_merge.offset == "10D"
+
+ for item in session_collection.items:
+ incite_item_factory(item=item, user=user)
+ item.initial_load()
+ for item in wall_collection.items:
+ item.initial_load()
+
+ # Confirm any of the items are archived
+ assert session_collection.progress.has_archive.eq(True).all()
+ assert wall_collection.progress.has_archive.eq(True).all()
+
+ enriched_session_merge.build(
+ client=client_no_amm,
+ session_coll=session_collection,
+ wall_coll=wall_collection,
+ pg_config=thl_web_rr,
+ )
+ assert enriched_session_merge.progress.has_archive.eq(True).all()
+
+ ddf = enriched_session_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+
+ ym_survey_wall_merge.build(
+ client=client_no_amm,
+ wall_coll=wall_collection,
+ enriched_session=enriched_session_merge,
+ )
+ assert ym_survey_wall_merge.progress.has_archive.eq(True).all()
+
+ # --
+
+ ddf = ym_survey_wall_merge.ddf()
+ df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
+
+ assert isinstance(df, pd.DataFrame)
+ assert not df.empty
+
+ # --
+ assert df.product_id.nunique() == 1
+ assert df.team_id.nunique() == 1
+ assert df.source.nunique() > 1
+
+ started_min_ts = df.started.min()
+ started_max_ts = df.started.max()
+
+ assert type(started_min_ts) is pd.Timestamp
+ assert type(started_max_ts) is pd.Timestamp
+
+ started_min: datetime = datetime.fromisoformat(str(started_min_ts))
+ started_max: datetime = datetime.fromisoformat(str(started_max_ts))
+
+ started_delta = started_max - started_min
+ assert started_delta >= timedelta(days=3)
diff --git a/tests/incite/schemas/__init__.py b/tests/incite/schemas/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/incite/schemas/__init__.py
diff --git a/tests/incite/schemas/test_admin_responses.py b/tests/incite/schemas/test_admin_responses.py
new file mode 100644
index 0000000..43aa399
--- /dev/null
+++ b/tests/incite/schemas/test_admin_responses.py
@@ -0,0 +1,239 @@
+from datetime import datetime, timezone, timedelta
+from random import sample
+from typing import List
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from generalresearch.incite.schemas import empty_dataframe_from_schema
+from generalresearch.incite.schemas.admin_responses import (
+ AdminPOPSchema,
+ SIX_HOUR_SECONDS,
+)
+from generalresearch.locales import Localelator
+
+
+class TestAdminPOPSchema:
+ schema_df = empty_dataframe_from_schema(AdminPOPSchema)
+ countries = list(Localelator().get_all_countries())[:5]
+ dates = [datetime(year=2024, month=1, day=i, tzinfo=None) for i in range(1, 10)]
+
+ @classmethod
+ def assign_valid_vals(cls, df: pd.DataFrame) -> pd.DataFrame:
+ for c in df.columns:
+ check_attrs: dict = AdminPOPSchema.columns[c].checks[0].statistics
+ df[c] = np.random.randint(
+ check_attrs["min_value"], check_attrs["max_value"], df.shape[0]
+ )
+
+ return df
+
+ def test_empty(self):
+ with pytest.raises(Exception):
+ AdminPOPSchema.validate(pd.DataFrame())
+
+ def test_new_empty_df(self):
+ df = empty_dataframe_from_schema(AdminPOPSchema)
+
+ assert isinstance(df, pd.DataFrame)
+ assert isinstance(df.index, pd.MultiIndex)
+ assert df.columns.size == len(AdminPOPSchema.columns)
+
+ def test_valid(self):
+ # (1) Works with raw naive datetime
+ dates = [
+ datetime(year=2024, month=1, day=i, tzinfo=None).isoformat()
+ for i in range(1, 10)
+ ]
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[dates, self.countries], names=["index0", "index1"]
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ df = AdminPOPSchema.validate(df)
+ assert isinstance(df, pd.DataFrame)
+
+ # (2) Works with isoformat naive datetime
+ dates = [datetime(year=2024, month=1, day=i, tzinfo=None) for i in range(1, 10)]
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[dates, self.countries], names=["index0", "index1"]
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ df = AdminPOPSchema.validate(df)
+ assert isinstance(df, pd.DataFrame)
+
+ def test_index_tz_parser(self):
+ tz_dates = [
+ datetime(year=2024, month=1, day=i, tzinfo=timezone.utc)
+ for i in range(1, 10)
+ ]
+
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[tz_dates, self.countries], names=["index0", "index1"]
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ # Initially, they're all set with a timezone
+ timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)]
+ assert all([ts.tz == timezone.utc for ts in timestmaps])
+
+ # After validation, the timezone is removed
+ df = AdminPOPSchema.validate(df)
+ timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)]
+ assert all([ts.tz is None for ts in timestmaps])
+
+ def test_index_tz_no_future_beyond_one_year(self):
+ now = datetime.now(tz=timezone.utc)
+ tz_dates = [now + timedelta(days=i * 365) for i in range(1, 10)]
+
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[tz_dates, self.countries], names=["index0", "index1"]
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ with pytest.raises(Exception) as cm:
+ AdminPOPSchema.validate(df)
+
+ assert (
+ "Index 'index0' failed element-wise validator "
+ "number 0: less_than(" in str(cm.value)
+ )
+
+ def test_index_only_str(self):
+ # --- float64 to str! ---
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[self.dates, np.random.rand(1, 10)[0]],
+ names=["index0", "index1"],
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ vals = [i for i in df.index.get_level_values(1)]
+ assert all([isinstance(v, float) for v in vals])
+
+ df = AdminPOPSchema.validate(df, lazy=True)
+
+ vals = [i for i in df.index.get_level_values(1)]
+ assert all([isinstance(v, str) for v in vals])
+
+ # --- int to str ---
+
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[self.dates, sample(range(100), 20)],
+ names=["index0", "index1"],
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ vals = [i for i in df.index.get_level_values(1)]
+ assert all([isinstance(v, int) for v in vals])
+
+ df = AdminPOPSchema.validate(df, lazy=True)
+
+ vals = [i for i in df.index.get_level_values(1)]
+ assert all([isinstance(v, str) for v in vals])
+
+ # a = 1
+ assert isinstance(df, pd.DataFrame)
+
+ def test_invalid_parsing(self):
+ # (1) Timezones AND as strings will still parse correctly
+ tz_str_dates = [
+ datetime(
+ year=2024, month=1, day=1, minute=i, tzinfo=timezone.utc
+ ).isoformat()
+ for i in range(1, 10)
+ ]
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[tz_str_dates, self.countries],
+ names=["index0", "index1"],
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+ df = AdminPOPSchema.validate(df, lazy=True)
+
+ assert isinstance(df, pd.DataFrame)
+ timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)]
+ assert all([ts.tz is None for ts in timestmaps])
+
+ # (2) Timezones are removed
+ dates = [
+ datetime(year=2024, month=1, day=1, minute=i, tzinfo=timezone.utc)
+ for i in range(1, 10)
+ ]
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[dates, self.countries], names=["index0", "index1"]
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ # Has tz before validation, and none after
+ timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)]
+ assert all([ts.tz is timezone.utc for ts in timestmaps])
+
+ df = AdminPOPSchema.validate(df, lazy=True)
+
+ timestmaps: List[pd.Timestamp] = [i for i in df.index.get_level_values(0)]
+ assert all([ts.tz is None for ts in timestmaps])
+
+ def test_clipping(self):
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[self.dates, self.countries],
+ names=["index0", "index1"],
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+ df = AdminPOPSchema.validate(df)
+ assert df.elapsed_avg.max() < SIX_HOUR_SECONDS
+
+ # Now that we know it's valid, break the elapsed avg
+ df["elapsed_avg"] = np.random.randint(
+ SIX_HOUR_SECONDS, SIX_HOUR_SECONDS + 10_000, df.shape[0]
+ )
+ assert df.elapsed_avg.max() > SIX_HOUR_SECONDS
+
+ # Confirm it doesn't fail if the values are greater, and that
+ # all the values are clipped to the max
+ df = AdminPOPSchema.validate(df)
+ assert df.elapsed_avg.eq(SIX_HOUR_SECONDS).all()
+
+ def test_rounding(self):
+ df = pd.DataFrame(
+ index=pd.MultiIndex.from_product(
+ iterables=[self.dates, self.countries],
+ names=["index0", "index1"],
+ ),
+ columns=self.schema_df.columns,
+ )
+ df = self.assign_valid_vals(df)
+
+ df["payout_avg"] = 2.123456789900002
+
+ assert df.payout_avg.sum() == 95.5555555455001
+
+ df = AdminPOPSchema.validate(df)
+ assert df.payout_avg.sum() == 95.40000000000003
diff --git a/tests/incite/schemas/test_thl_web.py b/tests/incite/schemas/test_thl_web.py
new file mode 100644
index 0000000..7f4434b
--- /dev/null
+++ b/tests/incite/schemas/test_thl_web.py
@@ -0,0 +1,70 @@
+import pandas as pd
+import pytest
+from pandera.errors import SchemaError
+
+
+class TestWallSchema:
+
+ def test_empty(self):
+ from generalresearch.incite.schemas.thl_web import THLWallSchema
+
+ with pytest.raises(SchemaError):
+ THLWallSchema.validate(pd.DataFrame())
+
+ def test_index_missing(self):
+ from generalresearch.incite.schemas.thl_web import THLWallSchema
+
+ df = pd.DataFrame(columns=THLWallSchema.columns.keys())
+
+ with pytest.raises(SchemaError) as cm:
+ THLWallSchema.validate(df)
+
+ def test_no_rows(self):
+ from generalresearch.incite.schemas.thl_web import THLWallSchema
+
+ df = pd.DataFrame(index=["uuid"], columns=THLWallSchema.columns.keys())
+
+ with pytest.raises(SchemaError) as cm:
+ THLWallSchema.validate(df)
+
+ def test_new_empty_df(self):
+ from generalresearch.incite.schemas import empty_dataframe_from_schema
+ from generalresearch.incite.schemas.thl_web import THLWallSchema
+
+ df = empty_dataframe_from_schema(THLWallSchema)
+ assert isinstance(df, pd.DataFrame)
+ assert df.columns.size == 20
+
+
+class TestSessionSchema:
+
+ def test_empty(self):
+ from generalresearch.incite.schemas.thl_web import THLSessionSchema
+
+ with pytest.raises(SchemaError):
+ THLSessionSchema.validate(pd.DataFrame())
+
+ def test_index_missing(self):
+ from generalresearch.incite.schemas.thl_web import THLSessionSchema
+
+ df = pd.DataFrame(columns=THLSessionSchema.columns.keys())
+ df.set_index("uuid", inplace=True)
+
+ with pytest.raises(SchemaError) as cm:
+ THLSessionSchema.validate(df)
+
+ def test_no_rows(self):
+ from generalresearch.incite.schemas.thl_web import THLSessionSchema
+
+ df = pd.DataFrame(index=["id"], columns=THLSessionSchema.columns.keys())
+
+ with pytest.raises(SchemaError) as cm:
+ THLSessionSchema.validate(df)
+
+ def test_new_empty_df(self):
+ from generalresearch.incite.schemas import empty_dataframe_from_schema
+ from generalresearch.incite.schemas.thl_web import THLSessionSchema
+
+ df = empty_dataframe_from_schema(THLSessionSchema)
+ assert isinstance(df, pd.DataFrame)
+ assert df.columns.size == 21
diff --git a/tests/incite/test_collection_base.py b/tests/incite/test_collection_base.py
new file mode 100644
index 0000000..497e5ab
--- /dev/null
+++ b/tests/incite/test_collection_base.py
@@ -0,0 +1,318 @@
+from datetime import datetime, timezone, timedelta
+from os.path import exists as pexists, join as pjoin
+from pathlib import Path
+from uuid import uuid4
+
+import numpy as np
+import pandas as pd
+import pytest
+from _pytest._code.code import ExceptionInfo
+
+from generalresearch.incite.base import CollectionBase
+from test_utils.incite.conftest import mnt_filepath
+
+AGO_15min = (datetime.now(tz=timezone.utc) - timedelta(minutes=15)).replace(
+ microsecond=0
+)
+AGO_1HR = (datetime.now(tz=timezone.utc) - timedelta(hours=1)).replace(microsecond=0)
+AGO_2HR = (datetime.now(tz=timezone.utc) - timedelta(hours=2)).replace(microsecond=0)
+
+
+class TestCollectionBase:
+ def test_init(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.df.empty is True
+
+ def test_init_df(self, mnt_filepath):
+ # Only an empty pd.DataFrame can ever be provided
+ instance = CollectionBase(
+ df=pd.DataFrame({}), archive_path=mnt_filepath.data_src
+ )
+ assert isinstance(instance.df, pd.DataFrame)
+
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(
+ df=pd.DataFrame(columns=[0, 1, 2]), archive_path=mnt_filepath.data_src
+ )
+ assert "Do not provide a pd.DataFrame" in str(cm.value)
+
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(
+ df=pd.DataFrame(np.random.randint(100, size=(1000, 1)), columns=["A"]),
+ archive_path=mnt_filepath.data_src,
+ )
+ assert "Do not provide a pd.DataFrame" in str(cm.value)
+
+ def test_init_start(self, mnt_filepath):
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(
+ start=datetime.now(tz=timezone.utc) - timedelta(days=10),
+ archive_path=mnt_filepath.data_src,
+ )
+ assert "Collection.start must not have microseconds" in str(cm.value)
+
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ tz = timezone(timedelta(hours=-5), "EST")
+
+ CollectionBase(
+ start=datetime(year=2000, month=1, day=1, tzinfo=tz),
+ archive_path=mnt_filepath.data_src,
+ )
+ assert "Timezone is not UTC" in str(cm.value)
+
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.start == datetime(
+ year=2018, month=1, day=1, tzinfo=timezone.utc
+ )
+
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(
+ start=AGO_2HR, offset="3h", archive_path=mnt_filepath.data_src
+ )
+ assert "Offset must be equal to, or smaller the start timestamp" in str(
+ cm.value
+ )
+
+ def test_init_archive_path(self, mnt_filepath):
+ """DirectoryPath is apparently smart enough to confirm that the
+ directory path exists.
+ """
+
+ # (1) Basic, confirm an existing path works
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.archive_path == mnt_filepath.data_src
+
+ # (2) It can't point to a file
+ file_path = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}.zip"))
+ assert not pexists(file_path)
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(archive_path=file_path)
+ assert "Path does not point to a directory" in str(cm.value)
+
+ # (3) It doesn't create the directory if it doesn't exist
+ new_path = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}/"))
+ assert not pexists(new_path)
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(archive_path=new_path)
+ assert "Path does not point to a directory" in str(cm.value)
+
+ def test_init_offset(self, mnt_filepath):
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(offset="1:X", archive_path=mnt_filepath.data_src)
+ assert "Invalid offset alias provided. Please review:" in str(cm.value)
+
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(offset=f"59sec", archive_path=mnt_filepath.data_src)
+ assert "Must be equal to, or longer than 1 min" in str(cm.value)
+
+ with pytest.raises(expected_exception=ValueError) as cm:
+ cm: ExceptionInfo
+ CollectionBase(offset=f"{365 * 101}d", archive_path=mnt_filepath.data_src)
+ assert "String should have at most 5 characters" in str(cm.value)
+
+
+class TestCollectionBaseProperties:
+
+ def test_items(self, mnt_filepath):
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ cm: ExceptionInfo
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ x = instance.items
+ assert "Must override" in str(cm.value)
+
+ def test_interval_range(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ # Private method requires the end parameter
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ cm: ExceptionInfo
+ instance._interval_range(end=None)
+ assert "an end value must be provided" in str(cm.value)
+
+ # End param must be same as started (which forces utc)
+ tz = timezone(timedelta(hours=-5), "EST")
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ cm: ExceptionInfo
+ instance._interval_range(end=datetime.now(tz=tz))
+ assert "Timezones must match" in str(cm.value)
+
+ res = instance._interval_range(end=datetime.now(tz=timezone.utc))
+ assert isinstance(res, pd.IntervalIndex)
+ assert res.closed_left
+ assert res.is_non_overlapping_monotonic
+ assert res.is_monotonic_increasing
+ assert res.is_unique
+
+ def test_interval_range2(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert isinstance(instance.interval_range, list)
+
+ # 1 hrs ago has 2 x 30min + the future 30min
+ OFFSET = "30min"
+ instance = CollectionBase(
+ start=AGO_1HR, offset=OFFSET, archive_path=mnt_filepath.data_src
+ )
+ assert len(instance.interval_range) == 3
+ assert instance.interval_range[0][0] == AGO_1HR
+
+ # 1 hrs ago has 1 x 60min + the future 60min
+ OFFSET = "60min"
+ instance = CollectionBase(
+ start=AGO_1HR, offset=OFFSET, archive_path=mnt_filepath.data_src
+ )
+ assert len(instance.interval_range) == 2
+
+ def test_progress(self, mnt_filepath):
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ cm: ExceptionInfo
+ instance = CollectionBase(
+ start=AGO_15min, offset="3min", archive_path=mnt_filepath.data_src
+ )
+ x = instance.progress
+ assert "Must override" in str(cm.value)
+
+ def test_progress2(self, mnt_filepath):
+ instance = CollectionBase(
+ start=AGO_2HR,
+ offset="15min",
+ archive_path=mnt_filepath.data_src,
+ )
+ assert instance.df.empty
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ df = instance.progress
+ assert "Must override" in str(cm.value)
+
+ def test_items2(self, mnt_filepath):
+ """There can't be a test for this because the Items need a path whic
+ isn't possible in the generic form
+ """
+ instance = CollectionBase(
+ start=AGO_1HR, offset="5min", archive_path=mnt_filepath.data_src
+ )
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ cm: ExceptionInfo
+ items = instance.items
+ assert "Must override" in str(cm.value)
+
+ # item = items[-3]
+ # ddf = instance.ddf(items=[item], include_partial=True, force_rr_latest=False)
+ # df = item.validate_ddf(ddf=ddf)
+ # assert isinstance(df, pd.DataFrame)
+ # assert len(df.columns) == 16
+ # assert str(df.product_id.dtype) == "object"
+ # assert str(ddf.product_id.dtype) == "string"
+
+ def test_items3(self, mnt_filepath):
+ instance = CollectionBase(
+ start=AGO_2HR,
+ offset="15min",
+ archive_path=mnt_filepath.data_src,
+ )
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ item = instance.items[0]
+ assert "Must override" in str(cm.value)
+
+
+class TestCollectionBaseMethodsCleanup:
+ def test_fetch_force_rr_latest(self, mnt_filepath):
+ coll = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ with pytest.raises(expected_exception=Exception) as cm:
+ cm: ExceptionInfo
+ coll.fetch_force_rr_latest(sources=[])
+ assert "Must override" in str(cm.value)
+
+ def test_fetch_all_paths(self, mnt_filepath):
+ coll = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ cm: ExceptionInfo
+ coll.fetch_all_paths(
+ items=None, force_rr_latest=False, include_partial=False
+ )
+ assert "Must override" in str(cm.value)
+
+
+class TestCollectionBaseMethodsCleanup:
+ @pytest.mark.skip
+ def test_cleanup_partials(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.cleanup_partials() is None # it doesn't return anything
+
+ def test_clear_tmp_archives(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.clear_tmp_archives() is None # it doesn't return anything
+
+ @pytest.mark.skip
+ def test_clear_corrupt_archives(self, mnt_filepath):
+ """TODO: expand this so it actually has corrupt archives that we
+ check to see if they're removed
+ """
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.clear_corrupt_archives() is None # it doesn't return anything
+
+ @pytest.mark.skip
+ def test_rebuild_symlinks(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ assert instance.rebuild_symlinks() is None
+
+
+class TestCollectionBaseMethodsSourceTiming:
+
+ def test_get_item(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+ i = pd.Interval(left=1, right=2, closed="left")
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ instance.get_item(interval=i)
+ assert "Must override" in str(cm.value)
+
+ def test_get_item_start(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ dt = datetime.now(tz=timezone.utc)
+ start = pd.Timestamp(dt)
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ instance.get_item_start(start=start)
+ assert "Must override" in str(cm.value)
+
+ def test_get_items(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ dt = datetime.now(tz=timezone.utc)
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ instance.get_items(since=dt)
+ assert "Must override" in str(cm.value)
+
+ def test_get_items_from_year(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ instance.get_items_from_year(year=2020)
+ assert "Must override" in str(cm.value)
+
+ def test_get_items_last90(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ instance.get_items_last90()
+ assert "Must override" in str(cm.value)
+
+ def test_get_items_last365(self, mnt_filepath):
+ instance = CollectionBase(archive_path=mnt_filepath.data_src)
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ instance.get_items_last365()
+ assert "Must override" in str(cm.value)
diff --git a/tests/incite/test_collection_base_item.py b/tests/incite/test_collection_base_item.py
new file mode 100644
index 0000000..e5d1d02
--- /dev/null
+++ b/tests/incite/test_collection_base_item.py
@@ -0,0 +1,223 @@
+from datetime import datetime, timezone
+from os.path import join as pjoin
+from pathlib import Path
+from uuid import uuid4
+
+import dask.dataframe as dd
+import pandas as pd
+import pytest
+from pydantic import ValidationError
+
+from generalresearch.incite.base import CollectionItemBase
+
+
+class TestCollectionItemBase:
+ def test_init(self):
+ dt = datetime.now(tz=timezone.utc).replace(microsecond=0)
+
+ instance = CollectionItemBase()
+ instance2 = CollectionItemBase(start=dt)
+
+ assert isinstance(instance, CollectionItemBase)
+ assert isinstance(instance2, CollectionItemBase)
+
+ assert instance.start.second == instance2.start.second
+ assert 0 == instance.start.microsecond == instance2.start.microsecond
+
+ def test_init_start(self):
+ dt = datetime.now(tz=timezone.utc)
+
+ with pytest.raises(expected_exception=ValidationError) as cm:
+ CollectionItemBase(start=dt)
+
+ assert "CollectionItem.start must not have microsecond precision" in str(
+ cm.value
+ )
+
+
+class TestCollectionItemBaseProperties:
+
+ def test_finish(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.finish
+
+ def test_interval(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.interval
+
+ def test_filename(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ res = instance.filename
+
+ assert "Do not use CollectionItemBase directly" in str(cm.value)
+
+ def test_partial_filename(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ res = instance.filename
+
+ assert "Do not use CollectionItemBase directly" in str(cm.value)
+
+ def test_empty_filename(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ res = instance.filename
+
+ assert "Do not use CollectionItemBase directly" in str(cm.value)
+
+ def test_path(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.path
+
+ def test_partial_path(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.partial_path
+
+ def test_empty_path(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.empty_path
+
+
+class TestCollectionItemBaseMethods:
+
+ @pytest.mark.skip
+ def test_next_numbered_path(self):
+ pass
+
+ @pytest.mark.skip
+ def test_search_highest_numbered_path(self):
+ pass
+
+ def test_tmp_filename(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ res = instance.tmp_filename()
+ assert "Do not use CollectionItemBase directly" in str(cm.value)
+
+ def test_tmp_path(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.tmp_path()
+
+ def test_is_empty(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.is_empty()
+
+ def test_has_empty(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.has_empty()
+
+ def test_has_partial_archive(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.has_partial_archive()
+
+ @pytest.mark.parametrize("include_empty", [True, False])
+ def test_has_archive(self, include_empty):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.has_archive(include_empty=include_empty)
+
+ def test_delete_archive_file(self, mnt_filepath):
+ path1 = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}.zip"))
+
+ # Confirm it doesn't exist, and that delete_archive() doesn't throw
+ # an error when trying to delete a non-existent file or folder
+ assert not path1.exists()
+ CollectionItemBase.delete_archive(generic_path=path1)
+ # TODO: LOG.warning(f"tried removing non-existent file: {generic_path}")
+
+ # Create it, confirm it exists, delete it, and confirm it doesn't exist
+ path1.touch()
+ assert path1.exists()
+ CollectionItemBase.delete_archive(generic_path=path1)
+ assert not path1.exists()
+
+ def test_delete_archive_dir(self, mnt_filepath):
+ path1 = Path(pjoin(mnt_filepath.data_src, f"{uuid4().hex}"))
+
+ # Confirm it doesn't exist, and that delete_archive() doesn't throw
+ # an error when trying to delete a non-existent file or folder
+ assert not path1.exists()
+ CollectionItemBase.delete_archive(generic_path=path1)
+ # TODO: LOG.warning(f"tried removing non-existent file: {generic_path}")
+
+ # Create it, confirm it exists, delete it, and confirm it doesn't exist
+ path1.mkdir()
+ assert path1.exists()
+ assert path1.is_dir()
+ CollectionItemBase.delete_archive(generic_path=path1)
+ assert not path1.exists()
+
+ def test_should_archive(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.should_archive()
+
+ def test_set_empty(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.set_empty()
+
+ def test_valid_archive(self):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=AttributeError) as cm:
+ res = instance.valid_archive(generic_path=None, sample=None)
+
+
+class TestCollectionItemBaseMethodsORM:
+
+ @pytest.mark.skip
+ def test_from_archive(self):
+ pass
+
+ @pytest.mark.parametrize("is_partial", [True, False])
+ def test_to_archive(self, is_partial):
+ instance = CollectionItemBase()
+
+ with pytest.raises(expected_exception=NotImplementedError) as cm:
+ res = instance.to_archive(
+ ddf=dd.from_pandas(data=pd.DataFrame()), is_partial=is_partial
+ )
+ assert "Must override" in str(cm.value)
+
+ @pytest.mark.skip
+ def test__to_dict(self):
+ pass
+
+ @pytest.mark.skip
+ def test_delete_partial(self):
+ pass
+
+ @pytest.mark.skip
+ def test_cleanup_partials(self):
+ pass
+
+ @pytest.mark.skip
+ def test_delete_dangling_partials(self):
+ pass
diff --git a/tests/incite/test_grl_flow.py b/tests/incite/test_grl_flow.py
new file mode 100644
index 0000000..c632f9a
--- /dev/null
+++ b/tests/incite/test_grl_flow.py
@@ -0,0 +1,23 @@
+class TestGRLFlow:
+
+ def test_init(self, mnt_filepath, thl_web_rr):
+ from generalresearch.incite.defaults import (
+ ledger_df_collection,
+ task_df_collection,
+ pop_ledger as plm,
+ )
+
+ from generalresearch.incite.collections.thl_web import (
+ LedgerDFCollection,
+ TaskAdjustmentDFCollection,
+ )
+ from generalresearch.incite.mergers.pop_ledger import PopLedgerMerge
+
+ ledger_df = ledger_df_collection(ds=mnt_filepath, pg_config=thl_web_rr)
+ assert isinstance(ledger_df, LedgerDFCollection)
+
+ task_df = task_df_collection(ds=mnt_filepath, pg_config=thl_web_rr)
+ assert isinstance(task_df, TaskAdjustmentDFCollection)
+
+ pop_ledger = plm(ds=mnt_filepath)
+ assert isinstance(pop_ledger, PopLedgerMerge)
diff --git a/tests/incite/test_interval_idx.py b/tests/incite/test_interval_idx.py
new file mode 100644
index 0000000..ea2bced
--- /dev/null
+++ b/tests/incite/test_interval_idx.py
@@ -0,0 +1,23 @@
+import pandas as pd
+from datetime import datetime, timezone, timedelta
+
+
+class TestIntervalIndex:
+
+ def test_init(self):
+ start = datetime(year=2000, month=1, day=1)
+ end = datetime(year=2000, month=1, day=10)
+
+ iv_r: pd.IntervalIndex = pd.interval_range(
+ start=start, end=end, freq="1d", closed="left"
+ )
+ assert isinstance(iv_r, pd.IntervalIndex)
+ assert len(iv_r.to_list()) == 9
+
+ # If the offset is longer than the end - start it will not
+ # error. It will simply have 0 rows.
+ iv_r: pd.IntervalIndex = pd.interval_range(
+ start=start, end=end, freq="30d", closed="left"
+ )
+ assert isinstance(iv_r, pd.IntervalIndex)
+ assert len(iv_r.to_list()) == 0