diff options
Diffstat (limited to 'tests/managers/thl/test_userhealth.py')
| -rw-r--r-- | tests/managers/thl/test_userhealth.py | 367 |
1 files changed, 367 insertions, 0 deletions
diff --git a/tests/managers/thl/test_userhealth.py b/tests/managers/thl/test_userhealth.py new file mode 100644 index 0000000..1cda8de --- /dev/null +++ b/tests/managers/thl/test_userhealth.py @@ -0,0 +1,367 @@ +from datetime import timezone, datetime +from uuid import uuid4 + +import faker +import pytest + +from generalresearch.managers.thl.userhealth import ( + IPRecordManager, + UserIpHistoryManager, +) +from generalresearch.models.thl.ipinfo import GeoIPInformation +from generalresearch.models.thl.user_iphistory import ( + IPRecord, +) +from generalresearch.models.thl.userhealth import AuditLogLevel, AuditLog + +fake = faker.Faker() + + +class TestAuditLog: + + def test_init(self, thl_web_rr, audit_log_manager): + from generalresearch.managers.thl.userhealth import AuditLogManager + + alm = AuditLogManager(pg_config=thl_web_rr) + + assert isinstance(alm, AuditLogManager) + assert isinstance(audit_log_manager, AuditLogManager) + assert alm.pg_config.db == thl_web_rr.db + assert audit_log_manager.pg_config.db == thl_web_rr.db + + @pytest.mark.parametrize( + argnames="level", + argvalues=list(AuditLogLevel), + ) + def test_create(self, audit_log_manager, user, level): + instance = audit_log_manager.create( + user_id=user.user_id, level=level, event_type=uuid4().hex + ) + assert isinstance(instance, AuditLog) + assert instance.id != 1 + + def test_get_by_id(self, audit_log, audit_log_manager): + from generalresearch.models.thl.userhealth import AuditLog + + with pytest.raises(expected_exception=Exception) as cm: + audit_log_manager.get_by_id(auditlog_id=999_999_999_999) + assert "No AuditLog with id of " in str(cm.value) + + assert isinstance(audit_log, AuditLog) + res = audit_log_manager.get_by_id(auditlog_id=audit_log.id) + assert isinstance(res, AuditLog) + assert res.id == audit_log.id + assert res.created.tzinfo == timezone.utc + + def test_filter_by_product( + self, + user_factory, + product_factory, + audit_log_factory, + audit_log_manager, + ): + p1 = product_factory() + p2 = product_factory() + + audit_log_factory(user_id=user_factory(product=p1).user_id) + audit_log_factory(user_id=user_factory(product=p1).user_id) + audit_log_factory(user_id=user_factory(product=p1).user_id) + + res = audit_log_manager.filter_by_product(product=p2) + assert isinstance(res, list) + assert len(res) == 0 + + res = audit_log_manager.filter_by_product(product=p1) + assert isinstance(res, list) + assert len(res) == 3 + + audit_log_factory(user_id=user_factory(product=p2).user_id) + res = audit_log_manager.filter_by_product(product=p2) + assert isinstance(res, list) + assert isinstance(res[0], AuditLog) + assert len(res) == 1 + + def test_filter_by_user_id( + self, user_factory, product, audit_log_factory, audit_log_manager + ): + u1 = user_factory(product=product) + u2 = user_factory(product=product) + + audit_log_factory(user_id=u1.user_id) + audit_log_factory(user_id=u1.user_id) + audit_log_factory(user_id=u1.user_id) + + res = audit_log_manager.filter_by_user_id(user_id=u1.user_id) + assert isinstance(res, list) + assert len(res) == 3 + + res = audit_log_manager.filter_by_user_id(user_id=u2.user_id) + assert isinstance(res, list) + assert len(res) == 0 + + audit_log_factory(user_id=u2.user_id) + + res = audit_log_manager.filter_by_user_id(user_id=u2.user_id) + assert isinstance(res, list) + assert isinstance(res[0], AuditLog) + assert len(res) == 1 + + def test_filter( + self, + user_factory, + product_factory, + audit_log_factory, + audit_log_manager, + ): + p1 = product_factory() + p2 = product_factory() + p3 = product_factory() + + u1 = user_factory(product=p1) + u2 = user_factory(product=p2) + u3 = user_factory(product=p3) + + with pytest.raises(expected_exception=AssertionError) as cm: + audit_log_manager.filter(user_ids=[]) + assert "must pass at least 1 user_id" in str(cm.value) + + with pytest.raises(expected_exception=AssertionError) as cm: + audit_log_manager.filter(user_ids=[u1, u2, u3]) + assert "must pass user_id as int" in str(cm.value) + + res = audit_log_manager.filter(user_ids=[u1.user_id, u2.user_id, u3.user_id]) + assert isinstance(res, list) + assert len(res) == 0 + + audit_log_factory(user_id=u1.user_id) + + res = audit_log_manager.filter(user_ids=[u1.user_id, u2.user_id, u3.user_id]) + assert isinstance(res, list) + assert isinstance(res[0], AuditLog) + assert len(res) == 1 + + def test_filter_count( + self, + user_factory, + product_factory, + audit_log_factory, + audit_log_manager, + ): + p1 = product_factory() + p2 = product_factory() + p3 = product_factory() + + u1 = user_factory(product=p1) + u2 = user_factory(product=p2) + u3 = user_factory(product=p3) + + with pytest.raises(expected_exception=AssertionError) as cm: + audit_log_manager.filter(user_ids=[]) + assert "must pass at least 1 user_id" in str(cm.value) + + with pytest.raises(expected_exception=AssertionError) as cm: + audit_log_manager.filter(user_ids=[u1, u2, u3]) + assert "must pass user_id as int" in str(cm.value) + + res = audit_log_manager.filter_count( + user_ids=[u1.user_id, u2.user_id, u3.user_id] + ) + assert isinstance(res, int) + assert res == 0 + + audit_log_factory(user_id=u1.user_id, level=20) + + res = audit_log_manager.filter_count( + user_ids=[u1.user_id, u2.user_id, u3.user_id] + ) + assert isinstance(res, int) + assert res == 1 + + res = audit_log_manager.filter_count( + user_ids=[u1.user_id, u2.user_id, u3.user_id], + created_after=datetime.now(tz=timezone.utc), + ) + assert isinstance(res, int) + assert res == 0 + + res = audit_log_manager.filter_count( + user_ids=[u1.user_id], event_type_like="offerwall-enter.%%" + ) + assert res == 1 + + audit_log_factory(user_id=u1.user_id, level=50) + res = audit_log_manager.filter_count( + user_ids=[u1.user_id], + event_type_like="offerwall-enter.%%", + level_ge=10, + ) + assert res == 2 + + res = audit_log_manager.filter_count( + user_ids=[u1.user_id], event_type_like="poop.%", level_ge=10 + ) + assert res == 0 + + +class TestIPRecordManager: + + def test_init(self, thl_web_rr, thl_redis_config, ip_record_manager): + instance = IPRecordManager(pg_config=thl_web_rr, redis_config=thl_redis_config) + assert isinstance(instance, IPRecordManager) + assert isinstance(ip_record_manager, IPRecordManager) + + def test_create(self, ip_record_manager, user, ip_information): + instance = ip_record_manager.create_dummy( + user_id=user.user_id, ip=ip_information.ip + ) + assert isinstance(instance, IPRecord) + + assert isinstance(instance.forwarded_ips, list) + assert isinstance(instance.forwarded_ip_records[0], IPRecord) + assert isinstance(instance.forwarded_ips[0], str) + + assert instance.created == instance.forwarded_ip_records[0].created + + ipr1 = ip_record_manager.filter_ip_records(filter_ips=[instance.ip]) + assert isinstance(ipr1, list) + assert instance.model_dump_json() == ipr1[0].model_dump_json() + + def test_prefetch_info( + self, + ip_record_factory, + ip_information_factory, + ip_geoname, + user, + thl_web_rr, + thl_redis_config, + ): + + ip = fake.ipv4_public() + ip_information_factory(ip=ip, geoname=ip_geoname) + ipr: IPRecord = ip_record_factory(user_id=user.user_id, ip=ip) + + assert ipr.information is None + assert len(ipr.forwarded_ip_records) >= 1 + fipr = ipr.forwarded_ip_records[0] + assert fipr.information is None + + ipr.prefetch_ipinfo( + pg_config=thl_web_rr, + redis_config=thl_redis_config, + include_forwarded=True, + ) + assert isinstance(ipr.information, GeoIPInformation) + assert ipr.information.ip == ipr.ip == ip + assert fipr.information is None, "the ipinfo doesn't exist in the db yet" + + ip_information_factory(ip=fipr.ip, geoname=ip_geoname) + ipr.prefetch_ipinfo( + pg_config=thl_web_rr, + redis_config=thl_redis_config, + include_forwarded=True, + ) + assert fipr.information is not None + + +@pytest.mark.usefixtures("user_iphistory_manager_clear_cache") +class TestUserIpHistoryManager: + def test_init(self, thl_web_rr, thl_redis_config, user_iphistory_manager): + instance = UserIpHistoryManager( + pg_config=thl_web_rr, redis_config=thl_redis_config + ) + assert isinstance(instance, UserIpHistoryManager) + assert isinstance(user_iphistory_manager, UserIpHistoryManager) + + def test_latest_record( + self, + user_iphistory_manager, + user, + ip_record_factory, + ip_information_factory, + ip_geoname, + ): + ip = fake.ipv4_public() + ip_information_factory(ip=ip, geoname=ip_geoname, is_anonymous=True) + ipr1: IPRecord = ip_record_factory(user_id=user.user_id, ip=ip) + + ipr = user_iphistory_manager.get_user_latest_ip_record(user=user) + assert ipr.ip == ipr1.ip + assert ipr.is_anonymous + assert ipr.information.lookup_prefix == "/32" + + ip = fake.ipv6() + ip_information_factory(ip=ip, geoname=ip_geoname) + ipr2: IPRecord = ip_record_factory(user_id=user.user_id, ip=ip) + + ipr = user_iphistory_manager.get_user_latest_ip_record(user=user) + assert ipr.ip == ipr2.ip + assert ipr.information.lookup_prefix == "/64" + assert ipr.information is not None + assert not ipr.is_anonymous + + country_iso = user_iphistory_manager.get_user_latest_country(user=user) + assert country_iso == ip_geoname.country_iso + + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert iph.ips[0].information is not None + assert iph.ips[1].information is not None + assert iph.ips[0].country_iso == country_iso + assert iph.ips[0].is_anonymous + assert iph.ips[0].ip == ipr1.ip + assert iph.ips[1].ip == ipr2.ip + + def test_virgin(self, user, user_iphistory_manager, ip_record_factory): + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert len(iph.ips) == 0 + + ip_record_factory(user_id=user.user_id, ip=fake.ipv4_public()) + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert len(iph.ips) == 1 + + def test_out_of_order( + self, + ip_record_factory, + user, + user_iphistory_manager, + ip_information_factory, + ip_geoname, + ): + # Create the user-ip association BEFORE the ip even exists in the ipinfo table + ip = fake.ipv4_public() + ip_record_factory(user_id=user.user_id, ip=ip) + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert len(iph.ips) == 1 + ipr = iph.ips[0] + assert ipr.information is None + assert not ipr.is_anonymous + + ip_information_factory(ip=ip, geoname=ip_geoname, is_anonymous=True) + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert len(iph.ips) == 1 + ipr = iph.ips[0] + assert ipr.information is not None + assert ipr.is_anonymous + + def test_out_of_order_ipv6( + self, + ip_record_factory, + user, + user_iphistory_manager, + ip_information_factory, + ip_geoname, + ): + # Create the user-ip association BEFORE the ip even exists in the ipinfo table + ip = fake.ipv6() + ip_record_factory(user_id=user.user_id, ip=ip) + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert len(iph.ips) == 1 + ipr = iph.ips[0] + assert ipr.information is None + assert not ipr.is_anonymous + + ip_information_factory(ip=ip, geoname=ip_geoname, is_anonymous=True) + iph = user_iphistory_manager.get_user_ip_history(user_id=user.user_id) + assert len(iph.ips) == 1 + ipr = iph.ips[0] + assert ipr.information is not None + assert ipr.is_anonymous |
