aboutsummaryrefslogtreecommitdiff
path: root/tests/incite/mergers/test_ym_survey_merge.py
blob: 4c2df6b7cf8d64b6c5963887de6aab12af00ec74 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from datetime import timedelta, timezone, datetime
from itertools import product

import pandas as pd
import pytest

# noinspection PyUnresolvedReferences
from distributed.utils_test import (
    gen_cluster,
    client_no_amm,
    loop,
    loop_in_thread,
    cleanup,
    cluster_fixture,
    client,
)

from test_utils.incite.collections.conftest import wall_collection, session_collection
from test_utils.incite.mergers.conftest import (
    enriched_session_merge,
    ym_survey_wall_merge,
)


@pytest.mark.parametrize(
    argnames="offset, duration, start",
    argvalues=list(
        product(
            ["12h", "3D"],
            [timedelta(days=30)],
            [
                (datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
                    microsecond=0
                )
            ],
        )
    ),
)
class TestYMSurveyMerge:
    """We override start, not because it's needed on the YMSurveyWall merge,
    which operates on a rolling 10-day window, but because we don't want
    to mock data in the wall collection and enriched_session_merge from
    the 1800s and then wonder why there is no data available in the past
    10 days in the database.
    """

    def test_base(
        self,
        client_no_amm,
        user_factory,
        product,
        ym_survey_wall_merge,
        wall_collection,
        session_collection,
        enriched_session_merge,
        delete_df_collection,
        incite_item_factory,
        thl_web_rr,
    ):
        from generalresearch.models.thl.user import User

        delete_df_collection(coll=session_collection)
        user: User = user_factory(product=product, created=session_collection.start)

        # -- Build & Setup
        assert ym_survey_wall_merge.start is None
        assert ym_survey_wall_merge.offset == "10D"

        for item in session_collection.items:
            incite_item_factory(item=item, user=user)
            item.initial_load()
        for item in wall_collection.items:
            item.initial_load()

        # Confirm any of the items are archived
        assert session_collection.progress.has_archive.eq(True).all()
        assert wall_collection.progress.has_archive.eq(True).all()

        enriched_session_merge.build(
            client=client_no_amm,
            session_coll=session_collection,
            wall_coll=wall_collection,
            pg_config=thl_web_rr,
        )
        assert enriched_session_merge.progress.has_archive.eq(True).all()

        ddf = enriched_session_merge.ddf()
        df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)

        assert isinstance(df, pd.DataFrame)
        assert not df.empty

        # --

        ym_survey_wall_merge.build(
            client=client_no_amm,
            wall_coll=wall_collection,
            enriched_session=enriched_session_merge,
        )
        assert ym_survey_wall_merge.progress.has_archive.eq(True).all()

        # --

        ddf = ym_survey_wall_merge.ddf()
        df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)

        assert isinstance(df, pd.DataFrame)
        assert not df.empty

        # --
        assert df.product_id.nunique() == 1
        assert df.team_id.nunique() == 1
        assert df.source.nunique() > 1

        started_min_ts = df.started.min()
        started_max_ts = df.started.max()

        assert type(started_min_ts) is pd.Timestamp
        assert type(started_max_ts) is pd.Timestamp

        started_min: datetime = datetime.fromisoformat(str(started_min_ts))
        started_max: datetime = datetime.fromisoformat(str(started_max_ts))

        started_delta = started_max - started_min
        assert started_delta >= timedelta(days=3)