aboutsummaryrefslogtreecommitdiff
path: root/tests/managers/thl/test_userhealth.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/managers/thl/test_userhealth.py')
-rw-r--r--tests/managers/thl/test_userhealth.py367
1 files changed, 367 insertions, 0 deletions
diff --git a/tests/managers/thl/test_userhealth.py b/tests/managers/thl/test_userhealth.py
new file mode 100644
index 0000000..1cda8de
--- /dev/null
+++ b/tests/managers/thl/test_userhealth.py
@@ -0,0 +1,367 @@
+from datetime import timezone, datetime
+from uuid import uuid4
+
+import faker
+import pytest
+
+from generalresearch.managers.thl.userhealth import (
+ IPRecordManager,
+ UserIpHistoryManager,
+)
+from generalresearch.models.thl.ipinfo import GeoIPInformation
+from generalresearch.models.thl.user_iphistory import (
+ IPRecord,
+)
+from generalresearch.models.thl.userhealth import AuditLogLevel, AuditLog
+
+fake = faker.Faker()
+
+
+class TestAuditLog:
+
+ def test_init(self, thl_web_rr, audit_log_manager):
+ from generalresearch.managers.thl.userhealth import AuditLogManager
+
+ alm = AuditLogManager(pg_config=thl_web_rr)
+
+ assert isinstance(alm, AuditLogManager)
+ assert isinstance(audit_log_manager, AuditLogManager)
+ assert alm.pg_config.db == thl_web_rr.db
+ assert audit_log_manager.pg_config.db == thl_web_rr.db
+
+ @pytest.mark.parametrize(
+ argnames="level",
+ argvalues=list(AuditLogLevel),
+ )
+ def test_create(self, audit_log_manager, user, level):
+ instance = audit_log_manager.create(
+ user_id=user.user_id, level=level, event_type=uuid4().hex
+ )
+ assert isinstance(instance, AuditLog)
+ assert instance.id != 1
+
+ def test_get_by_id(self, audit_log, audit_log_manager):
+ from generalresearch.models.thl.userhealth import AuditLog
+
+ with pytest.raises(expected_exception=Exception) as cm:
+ audit_log_manager.get_by_id(auditlog_id=999_999_999_999)
+ assert "No AuditLog with id of " in str(cm.value)
+
+ assert isinstance(audit_log, AuditLog)
+ res = audit_log_manager.get_by_id(auditlog_id=audit_log.id)
+ assert isinstance(res, AuditLog)
+ assert res.id == audit_log.id
+ assert res.created.tzinfo == timezone.utc
+
+ def test_filter_by_product(
+ self,
+ user_factory,
+ product_factory,
+ audit_log_factory,
+ audit_log_manager,
+ ):
+ p1 = product_factory()
+ p2 = product_factory()
+
+ audit_log_factory(user_id=user_factory(product=p1).user_id)
+ audit_log_factory(user_id=user_factory(product=p1).user_id)
+ audit_log_factory(user_id=user_factory(product=p1).user_id)
+
+ res = audit_log_manager.filter_by_product(product=p2)
+ assert isinstance(res, list)
+ assert len(res) == 0
+
+ res = audit_log_manager.filter_by_product(product=p1)
+ assert isinstance(res, list)
+ assert len(res) == 3
+
+ audit_log_factory(user_id=user_factory(product=p2).user_id)
+ res = audit_log_manager.filter_by_product(product=p2)
+ assert isinstance(res, list)
+ assert isinstance(res[0], AuditLog)
+ assert len(res) == 1
+
+ def test_filter_by_user_id(
+ self, user_factory, product, audit_log_factory, audit_log_manager
+ ):
+ u1 = user_factory(product=product)
+ u2 = user_factory(product=product)
+
+ audit_log_factory(user_id=u1.user_id)
+ audit_log_factory(user_id=u1.user_id)
+ audit_log_factory(user_id=u1.user_id)
+
+ res = audit_log_manager.filter_by_user_id(user_id=u1.user_id)
+ assert isinstance(res, list)
+ assert len(res) == 3
+
+ res = audit_log_manager.filter_by_user_id(user_id=u2.user_id)
+ assert isinstance(res, list)
+ assert len(res) == 0
+
+ audit_log_factory(user_id=u2.user_id)
+
+ res = audit_log_manager.filter_by_user_id(user_id=u2.user_id)
+ assert isinstance(res, list)
+ assert isinstance(res[0], AuditLog)
+ assert len(res) == 1
+
+ def test_filter(
+ self,
+ user_factory,
+ product_factory,
+ audit_log_factory,
+ audit_log_manager,
+ ):
+ p1 = product_factory()
+ p2 = product_factory()
+ p3 = product_factory()
+
+ u1 = user_factory(product=p1)
+ u2 = user_factory(product=p2)
+ u3 = user_factory(product=p3)
+
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ audit_log_manager.filter(user_ids=[])
+ assert "must pass at least 1 user_id" in str(cm.value)
+
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ audit_log_manager.filter(user_ids=[u1, u2, u3])
+ assert "must pass user_id as int" in str(cm.value)
+
+ res = audit_log_manager.filter(user_ids=[u1.user_id, u2.user_id, u3.user_id])
+ assert isinstance(res, list)
+ assert len(res) == 0
+
+ audit_log_factory(user_id=u1.user_id)
+
+ res = audit_log_manager.filter(user_ids=[u1.user_id, u2.user_id, u3.user_id])
+ assert isinstance(res, list)
+ assert isinstance(res[0], AuditLog)
+ assert len(res) == 1
+
+ def test_filter_count(
+ self,
+ user_factory,
+ product_factory,
+ audit_log_factory,
+ audit_log_manager,
+ ):
+ p1 = product_factory()
+ p2 = product_factory()
+ p3 = product_factory()
+
+ u1 = user_factory(product=p1)
+ u2 = user_factory(product=p2)
+ u3 = user_factory(product=p3)
+
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ audit_log_manager.filter(user_ids=[])
+ assert "must pass at least 1 user_id" in str(cm.value)
+
+ with pytest.raises(expected_exception=AssertionError) as cm:
+ audit_log_manager.filter(user_ids=[u1, u2, u3])
+ assert "must pass user_id as int" in str(cm.value)
+
+ res = audit_log_manager.filter_count(
+ user_ids=[u1.user_id, u2.user_id, u3.user_id]
+ )
+ assert isinstance(res, int)
+ assert res == 0
+
+ audit_log_factory(user_id=u1.user_id, level=20)
+
+ res = audit_log_manager.filter_count(
+ user_ids=[u1.user_id, u2.user_id, u3.user_id]
+ )
+ assert isinstance(res, int)
+ assert res == 1
+
+ res = audit_log_manager.filter_count(
+ user_ids=[u1.user_id, u2.user_id, u3.user_id],
+ created_after=datetime.now(tz=timezone.utc),
+ )
+ assert isinstance(res, int)
+ assert res == 0
+
+ res = audit_log_manager.filter_count(
+ user_ids=[u1.user_id], event_type_like="offerwall-enter.%%"
+ )
+ assert res == 1
+
+ audit_log_factory(user_id=u1.user_id, level=50)
+ res = audit_log_manager.filter_count(
+ user_ids=[u1.user_id],
+ event_type_like="offerwall-enter.%%",
+ level_ge=10,
+ )
+ assert res == 2
+
+ res = audit_log_manager.filter_count(
+ user_ids=[u1.user_id], event_type_like="poop.%", level_ge=10
+ )
+ assert res == 0
+
+
+class TestIPRecordManager:
+
+ def test_init(self, thl_web_rr, thl_redis_config, ip_record_manager):
+ instance = IPRecordManager(pg_config=thl_web_rr, redis_config=thl_redis_config)
+ assert isinstance(instance, IPRecordManager)
+ assert isinstance(ip_record_manager, IPRecordManager)
+
+ def test_create(self, ip_record_manager, user, ip_information):
+ instance = ip_record_manager.create_dummy(
+ user_id=user.user_id, ip=ip_information.ip
+ )
+ assert isinstance(instance, IPRecord)
+
+ assert isinstance(instance.forwarded_ips, list)
+ assert isinstance(instance.forwarded_ip_records[0], IPRecord)
+ assert isinstance(instance.forwarded_ips[0], str)
+
+ assert instance.created == instance.forwarded_ip_records[0].created
+
+ ipr1 = ip_record_manager.filter_ip_records(filter_ips=[instance.ip])
+ assert isinstance(ipr1, list)
+ assert instance.model_dump_json() == ipr1[0].model_dump_json()
+
+ def test_prefetch_info(
+ self,
+ ip_record_factory,
+ ip_information_factory,
+ ip_geoname,
+ user,
+ thl_web_rr,
+ thl_redis_config,
+ ):
+
+ ip = fake.ipv4_public()
+ ip_information_factory(ip=ip, geoname=ip_geoname)
+ ipr: IPRecord = ip_record_factory(user_id=user.user_id, ip=ip)
+
+ assert ipr.information is None
+ assert len(ipr.forwarded_ip_records) >= 1
+ fipr = ipr.forwarded_ip_records[0]
+ assert fipr.information is None
+
+ ipr.prefetch_ipinfo(
+ pg_config=thl_web_rr,
+ redis_config=thl_redis_config,
+ include_forwarded=True,
+ )
+ assert isinstance(ipr.information, GeoIPInformation)
+ assert ipr.information.ip == ipr.ip == ip
+ assert fipr.information is None, "the ipinfo doesn't exist in the db yet"
+
+ ip_information_factory(ip=fipr.ip, geoname=ip_geoname)
+ ipr.prefetch_ipinfo(
+ pg_config=thl_web_rr,
+ redis_config=thl_redis_config,
+ include_forwarded=True,
+ )
+ assert fipr.information is not None
+
+
+@pytest.mark.usefixtures("user_iphistory_manager_clear_cache")
+class TestUserIpHistoryManager:
+ def test_init(self, thl_web_rr, thl_redis_config, user_iphistory_manager):
+ instance = UserIpHistoryManager(
+ pg_config=thl_web_rr, redis_config=thl_redis_config
+ )
+ assert isinstance(instance, UserIpHistoryManager)
+ assert isinstance(user_iphistory_manager, UserIpHistoryManager)
+
+ def test_latest_record(
+ self,
+ user_iphistory_manager,
+ user,
+ ip_record_factory,
+ ip_information_factory,
+ ip_geoname,
+ ):
+ ip = fake.ipv4_public()
+ ip_information_factory(ip=ip, geoname=ip_geoname, is_anonymous=True)
+ ipr1: IPRecord = ip_record_factory(user_id=user.user_id, ip=ip)
+
+ ipr = user_iphistory_manager.get_user_latest_ip_record(user=user)
+ assert ipr.ip == ipr1.ip
+ assert ipr.is_anonymous
+ assert ipr.information.lookup_prefix == "/32"
+
+ ip = fake.ipv6()
+ ip_information_factory(ip=ip, geoname=ip_geoname)
+ ipr2: IPRecord = ip_record_factory(user_id=user.user_id, ip=ip)
+
+ ipr = user_iphistory_manager.get_user_latest_ip_record(user=user)
+ assert ipr.ip == ipr2.ip
+ assert ipr.information.lookup_prefix == "/64"
+ assert ipr.information is not None
+ assert not ipr.is_anonymous
+
+ country_iso = user_iphistory_manager.get_user_latest_country(user=user)
+ assert country_iso == ip_geoname.country_iso
+
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert iph.ips[0].information is not None
+ assert iph.ips[1].information is not None
+ assert iph.ips[0].country_iso == country_iso
+ assert iph.ips[0].is_anonymous
+ assert iph.ips[0].ip == ipr1.ip
+ assert iph.ips[1].ip == ipr2.ip
+
+ def test_virgin(self, user, user_iphistory_manager, ip_record_factory):
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert len(iph.ips) == 0
+
+ ip_record_factory(user_id=user.user_id, ip=fake.ipv4_public())
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert len(iph.ips) == 1
+
+ def test_out_of_order(
+ self,
+ ip_record_factory,
+ user,
+ user_iphistory_manager,
+ ip_information_factory,
+ ip_geoname,
+ ):
+ # Create the user-ip association BEFORE the ip even exists in the ipinfo table
+ ip = fake.ipv4_public()
+ ip_record_factory(user_id=user.user_id, ip=ip)
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert len(iph.ips) == 1
+ ipr = iph.ips[0]
+ assert ipr.information is None
+ assert not ipr.is_anonymous
+
+ ip_information_factory(ip=ip, geoname=ip_geoname, is_anonymous=True)
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert len(iph.ips) == 1
+ ipr = iph.ips[0]
+ assert ipr.information is not None
+ assert ipr.is_anonymous
+
+ def test_out_of_order_ipv6(
+ self,
+ ip_record_factory,
+ user,
+ user_iphistory_manager,
+ ip_information_factory,
+ ip_geoname,
+ ):
+ # Create the user-ip association BEFORE the ip even exists in the ipinfo table
+ ip = fake.ipv6()
+ ip_record_factory(user_id=user.user_id, ip=ip)
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert len(iph.ips) == 1
+ ipr = iph.ips[0]
+ assert ipr.information is None
+ assert not ipr.is_anonymous
+
+ ip_information_factory(ip=ip, geoname=ip_geoname, is_anonymous=True)
+ iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id)
+ assert len(iph.ips) == 1
+ ipr = iph.ips[0]
+ assert ipr.information is not None
+ assert ipr.is_anonymous