aboutsummaryrefslogtreecommitdiff
path: root/test_utils/incite
diff options
context:
space:
mode:
authorMax Nanis2026-03-06 16:49:46 -0500
committerMax Nanis2026-03-06 16:49:46 -0500
commit91d040211a4ed6e4157896256a762d3854777b5e (patch)
treecd95922ea4257dc8d3f4e4cbe8534474709a20dc /test_utils/incite
downloadgeneralresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz
generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip
Initial commitv3.3.4
Diffstat (limited to 'test_utils/incite')
-rw-r--r--test_utils/incite/__init__.py0
-rw-r--r--test_utils/incite/collections/__init__.py0
-rw-r--r--test_utils/incite/collections/conftest.py205
-rw-r--r--test_utils/incite/conftest.py201
-rw-r--r--test_utils/incite/mergers/__init__.py0
-rw-r--r--test_utils/incite/mergers/conftest.py247
6 files changed, 653 insertions, 0 deletions
diff --git a/test_utils/incite/__init__.py b/test_utils/incite/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test_utils/incite/__init__.py
diff --git a/test_utils/incite/collections/__init__.py b/test_utils/incite/collections/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test_utils/incite/collections/__init__.py
diff --git a/test_utils/incite/collections/conftest.py b/test_utils/incite/collections/conftest.py
new file mode 100644
index 0000000..1b61ed5
--- /dev/null
+++ b/test_utils/incite/collections/conftest.py
@@ -0,0 +1,205 @@
+from datetime import timedelta, datetime
+from typing import TYPE_CHECKING, Optional, Callable
+
+import pytest
+
+from test_utils.incite.conftest import mnt_filepath
+from test_utils.conftest import clear_directory
+
+if TYPE_CHECKING:
+ from generalresearch.incite.collections import DFCollection
+ from generalresearch.incite.base import GRLDatasets, DFCollectionType
+ from generalresearch.incite.collections.thl_web import LedgerDFCollection
+ from generalresearch.incite.collections.thl_web import (
+ WallDFCollection,
+ SessionDFCollection,
+ TaskAdjustmentDFCollection,
+ UserDFCollection,
+ AuditLogDFCollection,
+ )
+
+
+@pytest.fixture(scope="function")
+def user_collection(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+ thl_web_rr,
+) -> "UserDFCollection":
+ from generalresearch.incite.collections.thl_web import (
+ UserDFCollection,
+ DFCollectionType,
+ )
+
+ return UserDFCollection(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ pg_config=thl_web_rr,
+ archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.USER),
+ )
+
+
+@pytest.fixture(scope="function")
+def wall_collection(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+ thl_web_rr,
+) -> "WallDFCollection":
+ from generalresearch.incite.collections.thl_web import (
+ WallDFCollection,
+ DFCollectionType,
+ )
+
+ return WallDFCollection(
+ start=start,
+ finished=start + duration if duration else None,
+ offset=offset,
+ pg_config=thl_web_rr,
+ archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.WALL),
+ )
+
+
+@pytest.fixture(scope="function")
+def session_collection(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+ thl_web_rr,
+) -> "SessionDFCollection":
+ from generalresearch.incite.collections.thl_web import (
+ SessionDFCollection,
+ DFCollectionType,
+ )
+
+ return SessionDFCollection(
+ start=start,
+ finished=start + duration if duration else None,
+ offset=offset,
+ pg_config=thl_web_rr,
+ archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.SESSION),
+ )
+
+
+# IPInfoDFCollection
+# IPHistoryDFCollection
+# IPHistoryWSDFCollection
+
+# @pytest.fixture
+# def ip_history_collection(mnt_filepath, offset, duration, start,
+# thl_web_rw) -> IPHistoryDFCollection:
+# return IPHistoryDFCollection(
+# start=start,
+# finished=start + duration,
+# offset=offset,
+# pg_config=thl_web_rw,
+# archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.IP_HISTORY),
+# )
+
+
+@pytest.fixture(scope="function")
+def task_adj_collection(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: Optional[timedelta],
+ start: datetime,
+ thl_web_rr,
+) -> "TaskAdjustmentDFCollection":
+ from generalresearch.incite.collections.thl_web import (
+ TaskAdjustmentDFCollection,
+ DFCollectionType,
+ )
+
+ return TaskAdjustmentDFCollection(
+ start=start,
+ finished=start + duration if duration else duration,
+ offset=offset,
+ pg_config=thl_web_rr,
+ archive_path=mnt_filepath.archive_path(
+ enum_type=DFCollectionType.TASK_ADJUSTMENT
+ ),
+ )
+
+
+@pytest.fixture(scope="function")
+def auditlog_collection(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+ thl_web_rr,
+) -> "AuditLogDFCollection":
+ from generalresearch.incite.collections.thl_web import (
+ AuditLogDFCollection,
+ DFCollectionType,
+ )
+
+ return AuditLogDFCollection(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ pg_config=thl_web_rr,
+ archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.LEDGER),
+ )
+
+
+@pytest.fixture(scope="function")
+def ledger_collection(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+ thl_web_rr,
+) -> "LedgerDFCollection":
+ from generalresearch.incite.collections.thl_web import (
+ LedgerDFCollection,
+ DFCollectionType,
+ )
+
+ return LedgerDFCollection(
+ start=start,
+ finished=start + duration if duration else duration,
+ offset=offset,
+ pg_config=thl_web_rr,
+ archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.LEDGER),
+ )
+
+
+@pytest.fixture(scope="function")
+def rm_ledger_collection(ledger_collection) -> Callable:
+ def _rm_ledger_collection():
+ clear_directory(ledger_collection.archive_path)
+
+ return _rm_ledger_collection
+
+
+# --------------------------
+# Generic / Base
+# --------------------------
+
+
+@pytest.fixture(scope="function")
+def df_collection(
+ mnt_filepath,
+ df_collection_data_type: "DFCollectionType",
+ offset,
+ duration,
+ utc_90days_ago,
+ thl_web_rr,
+) -> "DFCollection":
+ from generalresearch.incite.collections import DFCollection
+
+ start = utc_90days_ago.replace(microsecond=0)
+
+ return DFCollection(
+ data_type=df_collection_data_type,
+ archive_path=mnt_filepath.archive_path(enum_type=df_collection_data_type),
+ offset=offset,
+ pg_config=thl_web_rr,
+ start=start,
+ finished=start + duration,
+ )
diff --git a/test_utils/incite/conftest.py b/test_utils/incite/conftest.py
new file mode 100644
index 0000000..759467a
--- /dev/null
+++ b/test_utils/incite/conftest.py
@@ -0,0 +1,201 @@
+from datetime import datetime, timezone, timedelta
+from os.path import join as pjoin
+from pathlib import Path
+from random import choice as randchoice
+from shutil import rmtree
+from typing import Callable, TYPE_CHECKING, Optional
+from uuid import uuid4
+
+import pytest
+from _pytest.fixtures import SubRequest
+from faker import Faker
+
+from test_utils.managers.ledger.conftest import session_with_tx_factory
+from test_utils.models.conftest import session_factory
+
+if TYPE_CHECKING:
+ from generalresearch.models.thl.user import User
+ from generalresearch.incite.base import GRLDatasets
+ from generalresearch.incite.mergers import MergeType
+ from generalresearch.incite.collections import (
+ DFCollection,
+ DFCollectionType,
+ DFCollectionItem,
+ )
+
+fake = Faker()
+
+
+@pytest.fixture(scope="function")
+def mnt_gr_api_dir(request: SubRequest, settings):
+ p = Path(settings.mnt_gr_api_dir)
+ p.mkdir(parents=True, exist_ok=True)
+
+ from generalresearch.models.admin.request import ReportType
+
+ for e in list(ReportType):
+ Path(pjoin(p, e.value)).mkdir(exist_ok=True)
+
+ def tmp_file_teardown():
+ assert "/mnt/" not in str(p), (
+ "Under no condition, testing or otherwise should we have code delete "
+ " any folders or potential data on a network mount"
+ )
+
+ rmtree(p)
+
+ request.addfinalizer(tmp_file_teardown)
+
+ return p
+
+
+@pytest.fixture(scope="function")
+def event_report_request(utc_hour_ago, start):
+ from generalresearch.models.admin.request import (
+ ReportRequest,
+ ReportType,
+ )
+
+ return ReportRequest.model_validate(
+ {
+ "report_type": ReportType.POP_EVENT,
+ "interval": "5min",
+ "start": start,
+ }
+ )
+
+
+@pytest.fixture(scope="function")
+def session_report_request(utc_hour_ago, start):
+ from generalresearch.models.admin.request import (
+ ReportRequest,
+ ReportType,
+ )
+
+ return ReportRequest.model_validate(
+ {
+ "report_type": ReportType.POP_SESSION,
+ "interval": "5min",
+ "start": start,
+ }
+ )
+
+
+@pytest.fixture(scope="function")
+def mnt_filepath(request: SubRequest) -> "GRLDatasets":
+ """Creates a temporary file path for all DFCollections & Mergers parquet
+ files.
+ """
+ from generalresearch.incite.base import GRLDatasets, NFSMount
+
+ instance = GRLDatasets(
+ data_src=Path(pjoin("/tmp", f"test-{uuid4().hex[:12]}")),
+ incite=NFSMount(point="thl-incite"),
+ )
+
+ def tmp_file_teardown():
+ assert "/mnt/" not in str(instance.data_src), (
+ "Under no condition, testing or otherwise should we have code delete "
+ " any folders or potential data on a network mount"
+ )
+
+ rmtree(instance.data_src)
+
+ request.addfinalizer(tmp_file_teardown)
+
+ return instance
+
+
+@pytest.fixture(scope="function")
+def start(utc_90days_ago) -> "datetime":
+ s = utc_90days_ago.replace(microsecond=0)
+ return s
+
+
+@pytest.fixture(scope="function")
+def offset() -> str:
+ return "15min"
+
+
+@pytest.fixture(scope="function")
+def duration() -> Optional["timedelta"]:
+ return timedelta(hours=1)
+
+
+@pytest.fixture(scope="function")
+def df_collection_data_type() -> "DFCollectionType":
+ from generalresearch.incite.collections import DFCollectionType
+
+ return DFCollectionType.TEST
+
+
+@pytest.fixture(scope="function")
+def merge_type() -> "MergeType":
+ from generalresearch.incite.mergers import MergeType
+
+ return MergeType.TEST
+
+
+@pytest.fixture(scope="function")
+def incite_item_factory(
+ session_factory,
+ product,
+ user_factory,
+ session_with_tx_factory,
+) -> Callable:
+ def _incite_item_factory(
+ item: "DFCollectionItem",
+ observations: int = 3,
+ user: Optional["User"] = None,
+ ):
+ from generalresearch.incite.collections import (
+ DFCollection,
+ DFCollectionType,
+ )
+ from generalresearch.models.thl.session import Source
+
+ collection: DFCollection = item._collection
+ data_type: DFCollectionType = collection.data_type
+
+ for idx in range(5):
+ item_time = fake.date_time_between(
+ start_date=item.start, end_date=item.finish, tzinfo=timezone.utc
+ )
+
+ match data_type:
+ case DFCollectionType.USER:
+ user_factory(product=product, created=item_time)
+
+ case DFCollectionType.LEDGER:
+ session_with_tx_factory(started=item_time, user=user)
+
+ case DFCollectionType.WALL:
+ u = (
+ user
+ if user
+ else user_factory(product=product, created=item_time)
+ )
+ session_factory(
+ user=u,
+ started=item_time,
+ wall_source=randchoice(list(Source)),
+ )
+
+ case DFCollectionType.SESSION:
+ u = (
+ user
+ if user
+ else user_factory(product=product, created=item_time)
+ )
+ session_factory(
+ user=u,
+ started=item_time,
+ wall_source=randchoice(list(Source)),
+ )
+
+ case _:
+ raise ValueError("Unsupported DFCollectionItem")
+
+ return None
+
+ return _incite_item_factory
diff --git a/test_utils/incite/mergers/__init__.py b/test_utils/incite/mergers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test_utils/incite/mergers/__init__.py
diff --git a/test_utils/incite/mergers/conftest.py b/test_utils/incite/mergers/conftest.py
new file mode 100644
index 0000000..e4e3bdd
--- /dev/null
+++ b/test_utils/incite/mergers/conftest.py
@@ -0,0 +1,247 @@
+from datetime import timedelta, datetime
+from typing import TYPE_CHECKING, Optional, Callable
+
+import pytest
+
+from test_utils.conftest import clear_directory
+from test_utils.incite.conftest import mnt_filepath
+
+if TYPE_CHECKING:
+ from generalresearch.incite.mergers import MergeType
+ from generalresearch.incite.mergers.ym_wall_summary import (
+ YMWallSummaryMerge,
+ YMWallSummaryMergeItem,
+ )
+ from generalresearch.incite.mergers.pop_ledger import PopLedgerMerge
+ from generalresearch.incite.mergers.ym_survey_wall import YMSurveyWallMerge
+ from generalresearch.incite.base import GRLDatasets
+ from generalresearch.incite.mergers.foundations.enriched_session import (
+ EnrichedSessionMerge,
+ )
+ from generalresearch.incite.mergers.foundations.enriched_task_adjust import (
+ EnrichedTaskAdjustMerge,
+ )
+ from generalresearch.incite.mergers.foundations.enriched_wall import (
+ EnrichedWallMerge,
+ )
+ from generalresearch.incite.mergers.foundations.user_id_product import (
+ UserIdProductMerge,
+ )
+ from generalresearch.incite.mergers.ym_survey_wall import (
+ YMSurveyWallMergeCollectionItem,
+ )
+
+
+# --------------------------
+# Merges
+# --------------------------
+
+
+@pytest.fixture(scope="function")
+def rm_pop_ledger_merge(pop_ledger_merge) -> Callable:
+ def _rm_pop_ledger_merge():
+ clear_directory(pop_ledger_merge.archive_path)
+
+ return _rm_pop_ledger_merge
+
+
+@pytest.fixture(scope="function")
+def pop_ledger_merge(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ start: datetime,
+ duration: timedelta,
+) -> "PopLedgerMerge":
+ from generalresearch.incite.mergers.pop_ledger import PopLedgerMerge
+ from generalresearch.incite.mergers import MergeType
+
+ return PopLedgerMerge(
+ start=start,
+ finished=start + duration if duration else None,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=MergeType.POP_LEDGER),
+ )
+
+
+@pytest.fixture(scope="function")
+def pop_ledger_merge_item(
+ start,
+ pop_ledger_merge,
+) -> "PopLedgerMergeItem":
+ from generalresearch.incite.mergers.pop_ledger import PopLedgerMergeItem
+
+ return PopLedgerMergeItem(
+ start=start,
+ _collection=pop_ledger_merge,
+ )
+
+
+@pytest.fixture(scope="function")
+def ym_survey_wall_merge(
+ mnt_filepath: "GRLDatasets",
+ start: datetime,
+) -> "YMSurveyWallMerge":
+ from generalresearch.incite.mergers.ym_survey_wall import YMSurveyWallMerge
+ from generalresearch.incite.mergers import MergeType
+
+ return YMSurveyWallMerge(
+ start=None,
+ offset="10D",
+ archive_path=mnt_filepath.archive_path(enum_type=MergeType.YM_SURVEY_WALL),
+ )
+
+
+@pytest.fixture(scope="function")
+def ym_survey_wall_merge_item(
+ start, ym_survey_wall_merge
+) -> "YMSurveyWallMergeCollectionItem":
+ from generalresearch.incite.mergers.ym_survey_wall import (
+ YMSurveyWallMergeCollectionItem,
+ )
+
+ return YMSurveyWallMergeCollectionItem(
+ start=start,
+ _collection=pop_ledger_merge,
+ )
+
+
+@pytest.fixture(scope="function")
+def ym_wall_summary_merge(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+) -> "YMWallSummaryMerge":
+ from generalresearch.incite.mergers.ym_wall_summary import YMWallSummaryMerge
+ from generalresearch.incite.mergers import MergeType
+
+ return YMWallSummaryMerge(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=MergeType.POP_LEDGER),
+ )
+
+
+def ym_wall_summary_merge_item(
+ start, ym_wall_summary_merge
+) -> "YMWallSummaryMergeItem":
+ from generalresearch.incite.mergers.ym_wall_summary import (
+ YMWallSummaryMergeItem,
+ )
+
+ return YMWallSummaryMergeItem(
+ start=start,
+ _collection=pop_ledger_merge,
+ )
+
+
+# --------------------------
+# Merges: Foundations
+# --------------------------
+
+
+@pytest.fixture(scope="function")
+def enriched_session_merge(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+) -> "EnrichedSessionMerge":
+ from generalresearch.incite.mergers.foundations.enriched_session import (
+ EnrichedSessionMerge,
+ )
+ from generalresearch.incite.mergers import MergeType
+
+ return EnrichedSessionMerge(
+ start=start,
+ finished=start + duration if duration else None,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=MergeType.ENRICHED_SESSION),
+ )
+
+
+@pytest.fixture(scope="function")
+def enriched_task_adjust_merge(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+) -> "EnrichedTaskAdjustMerge":
+ from generalresearch.incite.mergers.foundations.enriched_task_adjust import (
+ EnrichedTaskAdjustMerge,
+ )
+ from generalresearch.incite.mergers import MergeType
+
+ return EnrichedTaskAdjustMerge(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(
+ enum_type=MergeType.ENRICHED_TASK_ADJUST
+ ),
+ )
+
+
+@pytest.fixture(scope="function")
+def enriched_wall_merge(
+ mnt_filepath: "GRLDatasets",
+ offset: str,
+ duration: timedelta,
+ start: datetime,
+) -> "EnrichedWallMerge":
+ from generalresearch.incite.mergers import MergeType
+ from generalresearch.incite.mergers.foundations.enriched_wall import (
+ EnrichedWallMerge,
+ )
+
+ return EnrichedWallMerge(
+ start=start,
+ finished=start + duration if duration else None,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=MergeType.ENRICHED_WALL),
+ )
+
+
+@pytest.fixture(scope="function")
+def user_id_product_merge(
+ mnt_filepath: "GRLDatasets",
+ duration: timedelta,
+ offset,
+ start: datetime,
+) -> "UserIdProductMerge":
+ from generalresearch.incite.mergers.foundations.user_id_product import (
+ UserIdProductMerge,
+ )
+ from generalresearch.incite.mergers import MergeType
+
+ return UserIdProductMerge(
+ start=start,
+ finished=start + duration,
+ offset=None,
+ archive_path=mnt_filepath.archive_path(enum_type=MergeType.USER_ID_PRODUCT),
+ )
+
+
+# --------------------------
+# Generic / Base
+# --------------------------
+
+
+@pytest.fixture(scope="function")
+def merge_collection(
+ mnt_filepath,
+ merge_type: "MergeType",
+ offset,
+ duration,
+ start,
+):
+ from generalresearch.incite.mergers import MergeCollection
+
+ return MergeCollection(
+ merge_type=merge_type,
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )