1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
from datetime import datetime, timezone, timedelta
from itertools import product
import pandas as pd
import pytest
from pandera import DataFrameSchema
from generalresearch.incite.mergers import (
MergeCollection,
MergeType,
)
from test_utils.incite.conftest import mnt_filepath
merge_types = list(e for e in MergeType if e != MergeType.TEST)
@pytest.mark.parametrize(
argnames="merge_type, offset, duration, start",
argvalues=list(
product(
merge_types,
["5min", "6h", "14D"],
[timedelta(days=30)],
[
(datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
microsecond=0
)
],
)
),
)
class TestMergeCollection:
def test_init(self, mnt_filepath, merge_type, offset, duration, start):
with pytest.raises(expected_exception=ValueError) as cm:
MergeCollection(archive_path=mnt_filepath.data_src)
assert "Must explicitly provide a merge_type" in str(cm.value)
instance = MergeCollection(
merge_type=merge_type,
archive_path=mnt_filepath.archive_path(enum_type=merge_type),
)
assert instance.merge_type == merge_type
def test_items(self, mnt_filepath, merge_type, offset, duration, start):
instance = MergeCollection(
merge_type=merge_type,
offset=offset,
start=start,
finished=start + duration,
archive_path=mnt_filepath.archive_path(enum_type=merge_type),
)
assert len(instance.interval_range) == len(instance.items)
def test_progress(self, mnt_filepath, merge_type, offset, duration, start):
instance = MergeCollection(
merge_type=merge_type,
offset=offset,
start=start,
finished=start + duration,
archive_path=mnt_filepath.archive_path(enum_type=merge_type),
)
assert isinstance(instance.progress, pd.DataFrame)
assert instance.progress.shape[0] > 0
assert instance.progress.shape[1] == 7
assert instance.progress["group_by"].isnull().all()
def test_schema(self, mnt_filepath, merge_type, offset, duration, start):
instance = MergeCollection(
merge_type=merge_type,
archive_path=mnt_filepath.archive_path(enum_type=merge_type),
)
assert isinstance(instance._schema, DataFrameSchema)
def test_load(self, mnt_filepath, merge_type, offset, duration, start):
instance = MergeCollection(
merge_type=merge_type,
start=start,
finished=start + duration,
offset=offset,
archive_path=mnt_filepath.archive_path(enum_type=merge_type),
)
# Confirm that there are no archives available yet
assert instance.progress.has_archive.eq(False).all()
def test_get_items(self, mnt_filepath, merge_type, offset, duration, start):
instance = MergeCollection(
start=start,
finished=start + duration,
offset=offset,
merge_type=merge_type,
archive_path=mnt_filepath.archive_path(enum_type=merge_type),
)
# with pytest.raises(expected_exception=ResourceWarning) as cm:
res = instance.get_items_last365()
# assert "has missing archives", str(cm.value)
assert len(res) == len(instance.items)
|