aboutsummaryrefslogtreecommitdiff
path: root/tests/incite/mergers/test_merge_collection.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/incite/mergers/test_merge_collection.py')
-rw-r--r--tests/incite/mergers/test_merge_collection.py102
1 files changed, 102 insertions, 0 deletions
diff --git a/tests/incite/mergers/test_merge_collection.py b/tests/incite/mergers/test_merge_collection.py
new file mode 100644
index 0000000..692cac3
--- /dev/null
+++ b/tests/incite/mergers/test_merge_collection.py
@@ -0,0 +1,102 @@
+from datetime import datetime, timezone, timedelta
+from itertools import product
+
+import pandas as pd
+import pytest
+from pandera import DataFrameSchema
+
+from generalresearch.incite.mergers import (
+ MergeCollection,
+ MergeType,
+)
+from test_utils.incite.conftest import mnt_filepath
+
+merge_types = list(e for e in MergeType if e != MergeType.TEST)
+
+
+@pytest.mark.parametrize(
+ argnames="merge_type, offset, duration, start",
+ argvalues=list(
+ product(
+ merge_types,
+ ["5min", "6h", "14D"],
+ [timedelta(days=30)],
+ [
+ (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
+ microsecond=0
+ )
+ ],
+ )
+ ),
+)
+class TestMergeCollection:
+
+ def test_init(self, mnt_filepath, merge_type, offset, duration, start):
+ with pytest.raises(expected_exception=ValueError) as cm:
+ MergeCollection(archive_path=mnt_filepath.data_src)
+ assert "Must explicitly provide a merge_type" in str(cm.value)
+
+ instance = MergeCollection(
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+ assert instance.merge_type == merge_type
+
+ def test_items(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ offset=offset,
+ start=start,
+ finished=start + duration,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert len(instance.interval_range) == len(instance.items)
+
+ def test_progress(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ offset=offset,
+ start=start,
+ finished=start + duration,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert isinstance(instance.progress, pd.DataFrame)
+ assert instance.progress.shape[0] > 0
+ assert instance.progress.shape[1] == 7
+ assert instance.progress["group_by"].isnull().all()
+
+ def test_schema(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ assert isinstance(instance._schema, DataFrameSchema)
+
+ def test_load(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ merge_type=merge_type,
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ # Confirm that there are no archives available yet
+ assert instance.progress.has_archive.eq(False).all()
+
+ def test_get_items(self, mnt_filepath, merge_type, offset, duration, start):
+ instance = MergeCollection(
+ start=start,
+ finished=start + duration,
+ offset=offset,
+ merge_type=merge_type,
+ archive_path=mnt_filepath.archive_path(enum_type=merge_type),
+ )
+
+ # with pytest.raises(expected_exception=ResourceWarning) as cm:
+ res = instance.get_items_last365()
+ # assert "has missing archives", str(cm.value)
+ assert len(res) == len(instance.items)