diff options
| author | Max Nanis | 2026-03-06 16:49:46 -0500 |
|---|---|---|
| committer | Max Nanis | 2026-03-06 16:49:46 -0500 |
| commit | 91d040211a4ed6e4157896256a762d3854777b5e (patch) | |
| tree | cd95922ea4257dc8d3f4e4cbe8534474709a20dc /test_utils/incite | |
| download | generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.tar.gz generalresearch-91d040211a4ed6e4157896256a762d3854777b5e.zip | |
Initial commitv3.3.4
Diffstat (limited to 'test_utils/incite')
| -rw-r--r-- | test_utils/incite/__init__.py | 0 | ||||
| -rw-r--r-- | test_utils/incite/collections/__init__.py | 0 | ||||
| -rw-r--r-- | test_utils/incite/collections/conftest.py | 205 | ||||
| -rw-r--r-- | test_utils/incite/conftest.py | 201 | ||||
| -rw-r--r-- | test_utils/incite/mergers/__init__.py | 0 | ||||
| -rw-r--r-- | test_utils/incite/mergers/conftest.py | 247 |
6 files changed, 653 insertions, 0 deletions
diff --git a/test_utils/incite/__init__.py b/test_utils/incite/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test_utils/incite/__init__.py diff --git a/test_utils/incite/collections/__init__.py b/test_utils/incite/collections/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test_utils/incite/collections/__init__.py diff --git a/test_utils/incite/collections/conftest.py b/test_utils/incite/collections/conftest.py new file mode 100644 index 0000000..1b61ed5 --- /dev/null +++ b/test_utils/incite/collections/conftest.py @@ -0,0 +1,205 @@ +from datetime import timedelta, datetime +from typing import TYPE_CHECKING, Optional, Callable + +import pytest + +from test_utils.incite.conftest import mnt_filepath +from test_utils.conftest import clear_directory + +if TYPE_CHECKING: + from generalresearch.incite.collections import DFCollection + from generalresearch.incite.base import GRLDatasets, DFCollectionType + from generalresearch.incite.collections.thl_web import LedgerDFCollection + from generalresearch.incite.collections.thl_web import ( + WallDFCollection, + SessionDFCollection, + TaskAdjustmentDFCollection, + UserDFCollection, + AuditLogDFCollection, + ) + + +@pytest.fixture(scope="function") +def user_collection( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, + thl_web_rr, +) -> "UserDFCollection": + from generalresearch.incite.collections.thl_web import ( + UserDFCollection, + DFCollectionType, + ) + + return UserDFCollection( + start=start, + finished=start + duration, + offset=offset, + pg_config=thl_web_rr, + archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.USER), + ) + + +@pytest.fixture(scope="function") +def wall_collection( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, + thl_web_rr, +) -> "WallDFCollection": + from generalresearch.incite.collections.thl_web import ( + WallDFCollection, + DFCollectionType, + ) + + return WallDFCollection( + start=start, + finished=start + duration if duration else None, + offset=offset, + pg_config=thl_web_rr, + archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.WALL), + ) + + +@pytest.fixture(scope="function") +def session_collection( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, + thl_web_rr, +) -> "SessionDFCollection": + from generalresearch.incite.collections.thl_web import ( + SessionDFCollection, + DFCollectionType, + ) + + return SessionDFCollection( + start=start, + finished=start + duration if duration else None, + offset=offset, + pg_config=thl_web_rr, + archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.SESSION), + ) + + +# IPInfoDFCollection +# IPHistoryDFCollection +# IPHistoryWSDFCollection + +# @pytest.fixture +# def ip_history_collection(mnt_filepath, offset, duration, start, +# thl_web_rw) -> IPHistoryDFCollection: +# return IPHistoryDFCollection( +# start=start, +# finished=start + duration, +# offset=offset, +# pg_config=thl_web_rw, +# archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.IP_HISTORY), +# ) + + +@pytest.fixture(scope="function") +def task_adj_collection( + mnt_filepath: "GRLDatasets", + offset: str, + duration: Optional[timedelta], + start: datetime, + thl_web_rr, +) -> "TaskAdjustmentDFCollection": + from generalresearch.incite.collections.thl_web import ( + TaskAdjustmentDFCollection, + DFCollectionType, + ) + + return TaskAdjustmentDFCollection( + start=start, + finished=start + duration if duration else duration, + offset=offset, + pg_config=thl_web_rr, + archive_path=mnt_filepath.archive_path( + enum_type=DFCollectionType.TASK_ADJUSTMENT + ), + ) + + +@pytest.fixture(scope="function") +def auditlog_collection( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, + thl_web_rr, +) -> "AuditLogDFCollection": + from generalresearch.incite.collections.thl_web import ( + AuditLogDFCollection, + DFCollectionType, + ) + + return AuditLogDFCollection( + start=start, + finished=start + duration, + offset=offset, + pg_config=thl_web_rr, + archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.LEDGER), + ) + + +@pytest.fixture(scope="function") +def ledger_collection( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, + thl_web_rr, +) -> "LedgerDFCollection": + from generalresearch.incite.collections.thl_web import ( + LedgerDFCollection, + DFCollectionType, + ) + + return LedgerDFCollection( + start=start, + finished=start + duration if duration else duration, + offset=offset, + pg_config=thl_web_rr, + archive_path=mnt_filepath.archive_path(enum_type=DFCollectionType.LEDGER), + ) + + +@pytest.fixture(scope="function") +def rm_ledger_collection(ledger_collection) -> Callable: + def _rm_ledger_collection(): + clear_directory(ledger_collection.archive_path) + + return _rm_ledger_collection + + +# -------------------------- +# Generic / Base +# -------------------------- + + +@pytest.fixture(scope="function") +def df_collection( + mnt_filepath, + df_collection_data_type: "DFCollectionType", + offset, + duration, + utc_90days_ago, + thl_web_rr, +) -> "DFCollection": + from generalresearch.incite.collections import DFCollection + + start = utc_90days_ago.replace(microsecond=0) + + return DFCollection( + data_type=df_collection_data_type, + archive_path=mnt_filepath.archive_path(enum_type=df_collection_data_type), + offset=offset, + pg_config=thl_web_rr, + start=start, + finished=start + duration, + ) diff --git a/test_utils/incite/conftest.py b/test_utils/incite/conftest.py new file mode 100644 index 0000000..759467a --- /dev/null +++ b/test_utils/incite/conftest.py @@ -0,0 +1,201 @@ +from datetime import datetime, timezone, timedelta +from os.path import join as pjoin +from pathlib import Path +from random import choice as randchoice +from shutil import rmtree +from typing import Callable, TYPE_CHECKING, Optional +from uuid import uuid4 + +import pytest +from _pytest.fixtures import SubRequest +from faker import Faker + +from test_utils.managers.ledger.conftest import session_with_tx_factory +from test_utils.models.conftest import session_factory + +if TYPE_CHECKING: + from generalresearch.models.thl.user import User + from generalresearch.incite.base import GRLDatasets + from generalresearch.incite.mergers import MergeType + from generalresearch.incite.collections import ( + DFCollection, + DFCollectionType, + DFCollectionItem, + ) + +fake = Faker() + + +@pytest.fixture(scope="function") +def mnt_gr_api_dir(request: SubRequest, settings): + p = Path(settings.mnt_gr_api_dir) + p.mkdir(parents=True, exist_ok=True) + + from generalresearch.models.admin.request import ReportType + + for e in list(ReportType): + Path(pjoin(p, e.value)).mkdir(exist_ok=True) + + def tmp_file_teardown(): + assert "/mnt/" not in str(p), ( + "Under no condition, testing or otherwise should we have code delete " + " any folders or potential data on a network mount" + ) + + rmtree(p) + + request.addfinalizer(tmp_file_teardown) + + return p + + +@pytest.fixture(scope="function") +def event_report_request(utc_hour_ago, start): + from generalresearch.models.admin.request import ( + ReportRequest, + ReportType, + ) + + return ReportRequest.model_validate( + { + "report_type": ReportType.POP_EVENT, + "interval": "5min", + "start": start, + } + ) + + +@pytest.fixture(scope="function") +def session_report_request(utc_hour_ago, start): + from generalresearch.models.admin.request import ( + ReportRequest, + ReportType, + ) + + return ReportRequest.model_validate( + { + "report_type": ReportType.POP_SESSION, + "interval": "5min", + "start": start, + } + ) + + +@pytest.fixture(scope="function") +def mnt_filepath(request: SubRequest) -> "GRLDatasets": + """Creates a temporary file path for all DFCollections & Mergers parquet + files. + """ + from generalresearch.incite.base import GRLDatasets, NFSMount + + instance = GRLDatasets( + data_src=Path(pjoin("/tmp", f"test-{uuid4().hex[:12]}")), + incite=NFSMount(point="thl-incite"), + ) + + def tmp_file_teardown(): + assert "/mnt/" not in str(instance.data_src), ( + "Under no condition, testing or otherwise should we have code delete " + " any folders or potential data on a network mount" + ) + + rmtree(instance.data_src) + + request.addfinalizer(tmp_file_teardown) + + return instance + + +@pytest.fixture(scope="function") +def start(utc_90days_ago) -> "datetime": + s = utc_90days_ago.replace(microsecond=0) + return s + + +@pytest.fixture(scope="function") +def offset() -> str: + return "15min" + + +@pytest.fixture(scope="function") +def duration() -> Optional["timedelta"]: + return timedelta(hours=1) + + +@pytest.fixture(scope="function") +def df_collection_data_type() -> "DFCollectionType": + from generalresearch.incite.collections import DFCollectionType + + return DFCollectionType.TEST + + +@pytest.fixture(scope="function") +def merge_type() -> "MergeType": + from generalresearch.incite.mergers import MergeType + + return MergeType.TEST + + +@pytest.fixture(scope="function") +def incite_item_factory( + session_factory, + product, + user_factory, + session_with_tx_factory, +) -> Callable: + def _incite_item_factory( + item: "DFCollectionItem", + observations: int = 3, + user: Optional["User"] = None, + ): + from generalresearch.incite.collections import ( + DFCollection, + DFCollectionType, + ) + from generalresearch.models.thl.session import Source + + collection: DFCollection = item._collection + data_type: DFCollectionType = collection.data_type + + for idx in range(5): + item_time = fake.date_time_between( + start_date=item.start, end_date=item.finish, tzinfo=timezone.utc + ) + + match data_type: + case DFCollectionType.USER: + user_factory(product=product, created=item_time) + + case DFCollectionType.LEDGER: + session_with_tx_factory(started=item_time, user=user) + + case DFCollectionType.WALL: + u = ( + user + if user + else user_factory(product=product, created=item_time) + ) + session_factory( + user=u, + started=item_time, + wall_source=randchoice(list(Source)), + ) + + case DFCollectionType.SESSION: + u = ( + user + if user + else user_factory(product=product, created=item_time) + ) + session_factory( + user=u, + started=item_time, + wall_source=randchoice(list(Source)), + ) + + case _: + raise ValueError("Unsupported DFCollectionItem") + + return None + + return _incite_item_factory diff --git a/test_utils/incite/mergers/__init__.py b/test_utils/incite/mergers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test_utils/incite/mergers/__init__.py diff --git a/test_utils/incite/mergers/conftest.py b/test_utils/incite/mergers/conftest.py new file mode 100644 index 0000000..e4e3bdd --- /dev/null +++ b/test_utils/incite/mergers/conftest.py @@ -0,0 +1,247 @@ +from datetime import timedelta, datetime +from typing import TYPE_CHECKING, Optional, Callable + +import pytest + +from test_utils.conftest import clear_directory +from test_utils.incite.conftest import mnt_filepath + +if TYPE_CHECKING: + from generalresearch.incite.mergers import MergeType + from generalresearch.incite.mergers.ym_wall_summary import ( + YMWallSummaryMerge, + YMWallSummaryMergeItem, + ) + from generalresearch.incite.mergers.pop_ledger import PopLedgerMerge + from generalresearch.incite.mergers.ym_survey_wall import YMSurveyWallMerge + from generalresearch.incite.base import GRLDatasets + from generalresearch.incite.mergers.foundations.enriched_session import ( + EnrichedSessionMerge, + ) + from generalresearch.incite.mergers.foundations.enriched_task_adjust import ( + EnrichedTaskAdjustMerge, + ) + from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMerge, + ) + from generalresearch.incite.mergers.foundations.user_id_product import ( + UserIdProductMerge, + ) + from generalresearch.incite.mergers.ym_survey_wall import ( + YMSurveyWallMergeCollectionItem, + ) + + +# -------------------------- +# Merges +# -------------------------- + + +@pytest.fixture(scope="function") +def rm_pop_ledger_merge(pop_ledger_merge) -> Callable: + def _rm_pop_ledger_merge(): + clear_directory(pop_ledger_merge.archive_path) + + return _rm_pop_ledger_merge + + +@pytest.fixture(scope="function") +def pop_ledger_merge( + mnt_filepath: "GRLDatasets", + offset: str, + start: datetime, + duration: timedelta, +) -> "PopLedgerMerge": + from generalresearch.incite.mergers.pop_ledger import PopLedgerMerge + from generalresearch.incite.mergers import MergeType + + return PopLedgerMerge( + start=start, + finished=start + duration if duration else None, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=MergeType.POP_LEDGER), + ) + + +@pytest.fixture(scope="function") +def pop_ledger_merge_item( + start, + pop_ledger_merge, +) -> "PopLedgerMergeItem": + from generalresearch.incite.mergers.pop_ledger import PopLedgerMergeItem + + return PopLedgerMergeItem( + start=start, + _collection=pop_ledger_merge, + ) + + +@pytest.fixture(scope="function") +def ym_survey_wall_merge( + mnt_filepath: "GRLDatasets", + start: datetime, +) -> "YMSurveyWallMerge": + from generalresearch.incite.mergers.ym_survey_wall import YMSurveyWallMerge + from generalresearch.incite.mergers import MergeType + + return YMSurveyWallMerge( + start=None, + offset="10D", + archive_path=mnt_filepath.archive_path(enum_type=MergeType.YM_SURVEY_WALL), + ) + + +@pytest.fixture(scope="function") +def ym_survey_wall_merge_item( + start, ym_survey_wall_merge +) -> "YMSurveyWallMergeCollectionItem": + from generalresearch.incite.mergers.ym_survey_wall import ( + YMSurveyWallMergeCollectionItem, + ) + + return YMSurveyWallMergeCollectionItem( + start=start, + _collection=pop_ledger_merge, + ) + + +@pytest.fixture(scope="function") +def ym_wall_summary_merge( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, +) -> "YMWallSummaryMerge": + from generalresearch.incite.mergers.ym_wall_summary import YMWallSummaryMerge + from generalresearch.incite.mergers import MergeType + + return YMWallSummaryMerge( + start=start, + finished=start + duration, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=MergeType.POP_LEDGER), + ) + + +def ym_wall_summary_merge_item( + start, ym_wall_summary_merge +) -> "YMWallSummaryMergeItem": + from generalresearch.incite.mergers.ym_wall_summary import ( + YMWallSummaryMergeItem, + ) + + return YMWallSummaryMergeItem( + start=start, + _collection=pop_ledger_merge, + ) + + +# -------------------------- +# Merges: Foundations +# -------------------------- + + +@pytest.fixture(scope="function") +def enriched_session_merge( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, +) -> "EnrichedSessionMerge": + from generalresearch.incite.mergers.foundations.enriched_session import ( + EnrichedSessionMerge, + ) + from generalresearch.incite.mergers import MergeType + + return EnrichedSessionMerge( + start=start, + finished=start + duration if duration else None, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=MergeType.ENRICHED_SESSION), + ) + + +@pytest.fixture(scope="function") +def enriched_task_adjust_merge( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, +) -> "EnrichedTaskAdjustMerge": + from generalresearch.incite.mergers.foundations.enriched_task_adjust import ( + EnrichedTaskAdjustMerge, + ) + from generalresearch.incite.mergers import MergeType + + return EnrichedTaskAdjustMerge( + start=start, + finished=start + duration, + offset=offset, + archive_path=mnt_filepath.archive_path( + enum_type=MergeType.ENRICHED_TASK_ADJUST + ), + ) + + +@pytest.fixture(scope="function") +def enriched_wall_merge( + mnt_filepath: "GRLDatasets", + offset: str, + duration: timedelta, + start: datetime, +) -> "EnrichedWallMerge": + from generalresearch.incite.mergers import MergeType + from generalresearch.incite.mergers.foundations.enriched_wall import ( + EnrichedWallMerge, + ) + + return EnrichedWallMerge( + start=start, + finished=start + duration if duration else None, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=MergeType.ENRICHED_WALL), + ) + + +@pytest.fixture(scope="function") +def user_id_product_merge( + mnt_filepath: "GRLDatasets", + duration: timedelta, + offset, + start: datetime, +) -> "UserIdProductMerge": + from generalresearch.incite.mergers.foundations.user_id_product import ( + UserIdProductMerge, + ) + from generalresearch.incite.mergers import MergeType + + return UserIdProductMerge( + start=start, + finished=start + duration, + offset=None, + archive_path=mnt_filepath.archive_path(enum_type=MergeType.USER_ID_PRODUCT), + ) + + +# -------------------------- +# Generic / Base +# -------------------------- + + +@pytest.fixture(scope="function") +def merge_collection( + mnt_filepath, + merge_type: "MergeType", + offset, + duration, + start, +): + from generalresearch.incite.mergers import MergeCollection + + return MergeCollection( + merge_type=merge_type, + start=start, + finished=start + duration, + offset=offset, + archive_path=mnt_filepath.archive_path(enum_type=merge_type), + ) |
