1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
from datetime import timedelta, datetime, timezone
from itertools import product
import pandas as pd
import pytest
# noinspection PyUnresolvedReferences
from distributed.utils_test import (
gen_cluster,
client_no_amm,
loop,
loop_in_thread,
cleanup,
cluster_fixture,
client,
)
from generalresearch.incite.mergers.foundations.user_id_product import (
UserIdProductMergeItem,
)
from test_utils.incite.mergers.conftest import user_id_product_merge
@pytest.mark.parametrize(
argnames="offset, duration, start",
argvalues=list(
product(
["12h", "3D"],
[timedelta(days=5)],
[
(datetime.now(tz=timezone.utc) - timedelta(days=35)).replace(
microsecond=0
)
],
)
),
)
class TestUserIDProduct:
@pytest.mark.skip
def test_base(self, client_no_amm, user_id_product_merge):
ddf = user_id_product_merge.ddf()
df = client_no_amm.compute(collections=ddf, sync=True)
assert isinstance(df, pd.DataFrame)
assert not df.empty
@pytest.mark.skip
def test_base_item(self, client_no_amm, user_id_product_merge, user_collection):
assert len(user_id_product_merge.items) == 1
for item in user_id_product_merge.items:
assert isinstance(item, UserIdProductMergeItem)
path = item.path
try:
modified_time1 = path.stat().st_mtime
except (Exception,):
modified_time1 = 0
user_id_product_merge.build(client=client_no_amm, user_coll=user_collection)
modified_time2 = path.stat().st_mtime
assert modified_time2 > modified_time1
@pytest.mark.skip
def test_read(self, client_no_amm, user_id_product_merge):
users_ddf = user_id_product_merge.ddf()
df = client_no_amm.compute(collections=users_ddf, sync=True)
assert isinstance(df, pd.DataFrame)
assert len(df.columns) == 1
assert str(df.product_id.dtype) == "category"
|