aboutsummaryrefslogtreecommitdiff
path: root/tests/incite/mergers/test_merge_collection.py
blob: 692cac3939ecd9bc4dba9fc20d98f980076c2fc5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from datetime import datetime, timezone, timedelta
from itertools import product

import pandas as pd
import pytest
from pandera import DataFrameSchema

from generalresearch.incite.mergers import (
    MergeCollection,
    MergeType,
)
from test_utils.incite.conftest import mnt_filepath

merge_types = list(e for e in MergeType if e != MergeType.TEST)


@pytest.mark.parametrize(
    argnames="merge_type, offset, duration, start",
    argvalues=list(
        product(
            merge_types,
            ["5min", "6h", "14D"],
            [timedelta(days=30)],
            [
                (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
                    microsecond=0
                )
            ],
        )
    ),
)
class TestMergeCollection:

    def test_init(self, mnt_filepath, merge_type, offset, duration, start):
        with pytest.raises(expected_exception=ValueError) as cm:
            MergeCollection(archive_path=mnt_filepath.data_src)
        assert "Must explicitly provide a merge_type" in str(cm.value)

        instance = MergeCollection(
            merge_type=merge_type,
            archive_path=mnt_filepath.archive_path(enum_type=merge_type),
        )
        assert instance.merge_type == merge_type

    def test_items(self, mnt_filepath, merge_type, offset, duration, start):
        instance = MergeCollection(
            merge_type=merge_type,
            offset=offset,
            start=start,
            finished=start + duration,
            archive_path=mnt_filepath.archive_path(enum_type=merge_type),
        )

        assert len(instance.interval_range) == len(instance.items)

    def test_progress(self, mnt_filepath, merge_type, offset, duration, start):
        instance = MergeCollection(
            merge_type=merge_type,
            offset=offset,
            start=start,
            finished=start + duration,
            archive_path=mnt_filepath.archive_path(enum_type=merge_type),
        )

        assert isinstance(instance.progress, pd.DataFrame)
        assert instance.progress.shape[0] > 0
        assert instance.progress.shape[1] == 7
        assert instance.progress["group_by"].isnull().all()

    def test_schema(self, mnt_filepath, merge_type, offset, duration, start):
        instance = MergeCollection(
            merge_type=merge_type,
            archive_path=mnt_filepath.archive_path(enum_type=merge_type),
        )

        assert isinstance(instance._schema, DataFrameSchema)

    def test_load(self, mnt_filepath, merge_type, offset, duration, start):
        instance = MergeCollection(
            merge_type=merge_type,
            start=start,
            finished=start + duration,
            offset=offset,
            archive_path=mnt_filepath.archive_path(enum_type=merge_type),
        )

        # Confirm that there are no archives available yet
        assert instance.progress.has_archive.eq(False).all()

    def test_get_items(self, mnt_filepath, merge_type, offset, duration, start):
        instance = MergeCollection(
            start=start,
            finished=start + duration,
            offset=offset,
            merge_type=merge_type,
            archive_path=mnt_filepath.archive_path(enum_type=merge_type),
        )

        # with pytest.raises(expected_exception=ResourceWarning) as cm:
        res = instance.get_items_last365()
        # assert "has missing archives", str(cm.value)
        assert len(res) == len(instance.items)