aboutsummaryrefslogtreecommitdiff
path: root/tests/incite/__init__.py
blob: 2f736e8d4a92b1f8dd92d012b16771fd75318e57 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# class TestParquetBehaviors(CleanTempDirectoryTestCls):
#     wall_coll = WallDFCollection(
#         start=GLOBAL_VARS["wall"].start,
#         offset="49h",
#         archive_path=f"{settings.incite_mount_dir}/raw/df-collections/{DFCollectionType.WALL.value}",
#     )
#
#     def test_filters(self):
#         # Using REAL data here
#         start = datetime(year=2024, month=1, day=15, hour=12, tzinfo=timezone.utc)
#         end = datetime(year=2024, month=1, day=15, hour=20, tzinfo=timezone.utc)
#         end_max = datetime(
#             year=2024, month=1, day=15, hour=20, tzinfo=timezone.utc
#         ) + timedelta(hours=2)
#
#         ir = pd.Interval(left=pd.Timestamp(start), right=pd.Timestamp(end))
#         wall_items = [w for w in self.wall_coll.items if w.interval.overlaps(ir)]
#         ddf = self.wall_coll.ddf(
#             items=wall_items,
#             include_partial=True,
#             force_rr_latest=False,
#             columns=["started", "finished"],
#             filters=[
#                 ("started", ">=", start),
#                 ("started", "<", end),
#             ],
#         )
#
#         df = ddf.compute()
#         self.assertIsInstance(df, pd.DataFrame)
#
#         # No started=None, and they're all between the started and the end
#         self.assertFalse(df.started.isna().any())
#         self.assertFalse((df.started < start).any())
#         self.assertFalse((df.started > end).any())
#
#         # Has finished=None and finished=time, so
#         # the finished is all between the started and
#         # the end_max
#         self.assertTrue(df.finished.isna().any())
#         self.assertTrue((df.finished.dt.year == 2024).any())
#
#         self.assertFalse((df.finished > end_max).any())
#         self.assertFalse((df.finished < start).any())
#
#     # def test_user_id_list(self):
#     #     # Calling compute turns it into a np.ndarray
#     #     user_ids = self.instance.ddf(
#     #         columns=["user_id"]
#     #     ).user_id.unique().values.compute()
#     #     self.assertIsInstance(user_ids, np.ndarray)
#     #
#     #     # If ddf filters work with ndarray
#     #     user_product_merge = <todo: assign>
#     #
#     #     with self.assertRaises(TypeError) as cm:
#     #         user_product_merge.ddf(
#     #             filters=[("id", "in", user_ids)])
#     #     self.assertIn("Value of 'in' filter must be a list, set or tuple.", str(cm.exception))
#     #
#     #     # No compute == dask array
#     #     user_ids = self.instance.ddf(
#     #         columns=["user_id"]
#     #     ).user_id.unique().values
#     #     self.assertIsInstance(user_ids, da.Array)
#     #
#     #     with self.assertRaises(TypeError) as cm:
#     #         user_product_merge.ddf(
#     #             filters=[("id", "in", user_ids)])
#     #     self.assertIn("Value of 'in' filter must be a list, set or tuple.", str(cm.exception))
#     #
#     #     # pick a product_id (most active one)
#     #     self.product_id = instance.df.product_id.value_counts().index[0]
#     #     self.expected_columns: int = len(instance._schema.columns)
#     #     self.instance = instance
#
#     # def test_basic(self):
#     #     # now try to load up the data!
#     #     self.instance.grouped_key = self.product_id
#     #
#     #     # Confirm any of the items are archived
#     #     self.assertTrue(self.instance.progress.has_archive.eq(True).any())
#     #
#     #     # Confirm it returns a df
#     #     df = self.instance.dd().compute()
#     #
#     #     self.assertFalse(df.empty)
#     #     self.assertEqual(df.shape[1], self.expected_columns)
#     #     self.assertGreater(df.shape[0], 1)
#     #
#     #     # Confirm that DF only contains this product_id
#     #     self.assertEqual(df[df.product_id == self.product_id].shape, df.shape)
#
#     # def test_god_vs_product_id(self):
#     #     self.instance.grouped_key = self.product_id
#     #     df_product_origin = self.instance.dd(columns=None, filters=None).compute()
#     #
#     #     self.instance.grouped_key = None
#     #     df_god_origin = self.instance.dd(columns=None,
#     #                                      filters=[("product_id", "==", self.product_id)]).compute()
#     #
#     #     self.assertTrue(df_god_origin.equals(df_product_origin))
#
#     #
#     #     instance = POPSessionMerge(
#     #         start=START,
#     #         archive_path=self.PATH,
#     #         group_by="product_id"
#     #     )
#     #     instance.build(U=GLOBAL_VARS["user"], S=GLOBAL_VARS["session"], W=GLOBAL_VARS["wall"])
#     #     instance.save(god_only=False)
#     #
#     #     # pick a product_id (most active one)
#     #     self.product_id = instance.df.product_id.value_counts().index[0]
#     #     self.expected_columns: int = len(instance._schema.columns)
#     #     self.instance = instance
#
#
# class TestValidItem(CleanTempDirectoryTestCls):
#
#     def test_interval(self):
#         for k in GLOBAL_VARS.keys():
#             coll = GLOBAL_VARS[k]
#             item = coll.items[0]
#             ir = item.interval
#
#             self.assertIsInstance(ir, pd.Interval)
#             self.assertLess(a=ir.left, b=ir.right)
#
#     def test_str(self):
#         for k in GLOBAL_VARS.keys():
#             coll = GLOBAL_VARS[k]
#             item = coll.items[0]
#
#             offset = coll.offset or "–"
#
#             self.assertIn(offset, str(item))