1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
from datetime import timedelta, timezone, datetime
from itertools import product
import pandas as pd
import pytest
# noinspection PyUnresolvedReferences
from distributed.utils_test import (
gen_cluster,
client_no_amm,
loop,
loop_in_thread,
cleanup,
cluster_fixture,
client,
)
from test_utils.incite.collections.conftest import wall_collection, session_collection
from test_utils.incite.mergers.conftest import (
enriched_session_merge,
ym_survey_wall_merge,
)
@pytest.mark.parametrize(
argnames="offset, duration, start",
argvalues=list(
product(
["12h", "3D"],
[timedelta(days=30)],
[
(datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
microsecond=0
)
],
)
),
)
class TestYMSurveyMerge:
"""We override start, not because it's needed on the YMSurveyWall merge,
which operates on a rolling 10-day window, but because we don't want
to mock data in the wall collection and enriched_session_merge from
the 1800s and then wonder why there is no data available in the past
10 days in the database.
"""
def test_base(
self,
client_no_amm,
user_factory,
product,
ym_survey_wall_merge,
wall_collection,
session_collection,
enriched_session_merge,
delete_df_collection,
incite_item_factory,
thl_web_rr,
):
from generalresearch.models.thl.user import User
delete_df_collection(coll=session_collection)
user: User = user_factory(product=product, created=session_collection.start)
# -- Build & Setup
assert ym_survey_wall_merge.start is None
assert ym_survey_wall_merge.offset == "10D"
for item in session_collection.items:
incite_item_factory(item=item, user=user)
item.initial_load()
for item in wall_collection.items:
item.initial_load()
# Confirm any of the items are archived
assert session_collection.progress.has_archive.eq(True).all()
assert wall_collection.progress.has_archive.eq(True).all()
enriched_session_merge.build(
client=client_no_amm,
session_coll=session_collection,
wall_coll=wall_collection,
pg_config=thl_web_rr,
)
assert enriched_session_merge.progress.has_archive.eq(True).all()
ddf = enriched_session_merge.ddf()
df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
assert isinstance(df, pd.DataFrame)
assert not df.empty
# --
ym_survey_wall_merge.build(
client=client_no_amm,
wall_coll=wall_collection,
enriched_session=enriched_session_merge,
)
assert ym_survey_wall_merge.progress.has_archive.eq(True).all()
# --
ddf = ym_survey_wall_merge.ddf()
df: pd.DataFrame = client_no_amm.compute(collections=ddf, sync=True)
assert isinstance(df, pd.DataFrame)
assert not df.empty
# --
assert df.product_id.nunique() == 1
assert df.team_id.nunique() == 1
assert df.source.nunique() > 1
started_min_ts = df.started.min()
started_max_ts = df.started.max()
assert type(started_min_ts) is pd.Timestamp
assert type(started_max_ts) is pd.Timestamp
started_min: datetime = datetime.fromisoformat(str(started_min_ts))
started_max: datetime = datetime.fromisoformat(str(started_max_ts))
started_delta = started_max - started_min
assert started_delta >= timedelta(days=3)
|