aboutsummaryrefslogtreecommitdiff
path: root/tests/managers/thl/test_maxmind.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/managers/thl/test_maxmind.py')
-rw-r--r--tests/managers/thl/test_maxmind.py273
1 files changed, 273 insertions, 0 deletions
diff --git a/tests/managers/thl/test_maxmind.py b/tests/managers/thl/test_maxmind.py
new file mode 100644
index 0000000..c588c58
--- /dev/null
+++ b/tests/managers/thl/test_maxmind.py
@@ -0,0 +1,273 @@
+import json
+import logging
+from typing import Callable
+
+import geoip2.models
+import pytest
+from faker import Faker
+from faker.providers.address.en_US import Provider as USAddressProvider
+
+from generalresearch.managers.thl.ipinfo import GeoIpInfoManager
+from generalresearch.managers.thl.maxmind import MaxmindManager
+from generalresearch.managers.thl.maxmind.basic import (
+ MaxmindBasicManager,
+)
+from generalresearch.models.thl.ipinfo import (
+ GeoIPInformation,
+ normalize_ip,
+)
+from generalresearch.models.thl.maxmind.definitions import UserType
+
+fake = Faker()
+
+US_STATES = {x.lower() for x in USAddressProvider.states}
+
+IP_v4_INDIA = "106.203.146.157"
+IP_v6_INDIA = "2402:3a80:4649:de3f:0:24:74aa:b601"
+IP_v4_US = "174.218.60.101"
+IP_v6_US = "2600:1700:ece0:9410:55d:faf3:c15d:6e4"
+IP_v6_US_SAME_64 = "2600:1700:ece0:9410:55d:faf3:c15d:aaaa"
+
+
+@pytest.fixture(scope="session")
+def delete_ipinfo(thl_web_rw) -> Callable:
+ def _delete_ipinfo(ip):
+ thl_web_rw.execute_write(
+ query="DELETE FROM thl_geoname WHERE geoname_id IN (SELECT geoname_id FROM thl_ipinformation WHERE ip = %s);",
+ params=[ip],
+ )
+ thl_web_rw.execute_write(
+ query="DELETE FROM thl_ipinformation WHERE ip = %s;",
+ params=[ip],
+ )
+
+ return _delete_ipinfo
+
+
+class TestMaxmindBasicManager:
+
+ def test_init(self, maxmind_basic_manager):
+
+ assert isinstance(maxmind_basic_manager, MaxmindBasicManager)
+
+ def test_get_basic_ip_information(self, maxmind_basic_manager):
+ ip = IP_v4_INDIA
+ maxmind_basic_manager.run_update_geoip_db()
+
+ res1 = maxmind_basic_manager.get_basic_ip_information(ip_address=ip)
+ assert isinstance(res1, geoip2.models.Country)
+ assert res1.country.iso_code == "IN"
+ assert res1.country.name == "India"
+
+ res2 = maxmind_basic_manager.get_basic_ip_information(
+ ip_address=fake.ipv4_private()
+ )
+ assert res2 is None
+
+ def test_get_country_iso_from_ip_geoip2db(self, maxmind_basic_manager):
+ ip = IP_v4_INDIA
+ maxmind_basic_manager.run_update_geoip_db()
+
+ res1 = maxmind_basic_manager.get_country_iso_from_ip_geoip2db(ip=ip)
+ assert res1 == "in"
+
+ res2 = maxmind_basic_manager.get_country_iso_from_ip_geoip2db(
+ ip=fake.ipv4_private()
+ )
+ assert res2 is None
+
+ def test_get_basic_ip_information_ipv6(self, maxmind_basic_manager):
+ ip = IP_v6_INDIA
+ maxmind_basic_manager.run_update_geoip_db()
+
+ res1 = maxmind_basic_manager.get_basic_ip_information(ip_address=ip)
+ assert isinstance(res1, geoip2.models.Country)
+ assert res1.country.iso_code == "IN"
+ assert res1.country.name == "India"
+
+
+class TestMaxmindManager:
+
+ def test_init(self, thl_web_rr, thl_redis_config, maxmind_manager: MaxmindManager):
+ instance = MaxmindManager(pg_config=thl_web_rr, redis_config=thl_redis_config)
+ assert isinstance(instance, MaxmindManager)
+ assert isinstance(maxmind_manager, MaxmindManager)
+
+ def test_create_basic(
+ self,
+ maxmind_manager: MaxmindManager,
+ geoipinfo_manager: GeoIpInfoManager,
+ delete_ipinfo,
+ ):
+ # This is (currently) an IP in India, and so it should only do the basic lookup
+ ip = IP_v4_INDIA
+ delete_ipinfo(ip)
+ geoipinfo_manager.clear_cache(ip)
+ assert geoipinfo_manager.get_cache(ip) is None
+ assert geoipinfo_manager.get_mysql_if_exists(ip) is None
+
+ maxmind_manager.run_ip_information(ip, force_insights=False)
+ # Check that it is in the cache and in mysql
+ res = geoipinfo_manager.get_cache(ip)
+ assert res.ip == ip
+ assert res.basic
+ res = geoipinfo_manager.get_mysql(ip)
+ assert res.ip == ip
+ assert res.basic
+
+ def test_create_basic_ipv6(
+ self,
+ maxmind_manager: MaxmindManager,
+ geoipinfo_manager: GeoIpInfoManager,
+ delete_ipinfo,
+ ):
+ # This is (currently) an IP in India, and so it should only do the basic lookup
+ ip = IP_v6_INDIA
+ normalized_ip, lookup_prefix = normalize_ip(ip)
+ delete_ipinfo(ip)
+ geoipinfo_manager.clear_cache(ip)
+ delete_ipinfo(normalized_ip)
+ geoipinfo_manager.clear_cache(normalized_ip)
+ assert geoipinfo_manager.get_cache(ip) is None
+ assert geoipinfo_manager.get_cache(normalized_ip) is None
+ assert geoipinfo_manager.get_mysql_if_exists(ip) is None
+ assert geoipinfo_manager.get_mysql_if_exists(normalized_ip) is None
+
+ maxmind_manager.run_ip_information(ip, force_insights=False)
+
+ # Check that it is in the cache
+ res = geoipinfo_manager.get_cache(ip)
+ # The looked up IP (/128) is returned,
+ assert res.ip == ip
+ assert res.lookup_prefix == "/64"
+ assert res.basic
+
+ # ... but the normalized version was stored (/64)
+ assert geoipinfo_manager.get_cache_raw(ip) is None
+ res = json.loads(geoipinfo_manager.get_cache_raw(normalized_ip))
+ assert res["ip"] == normalized_ip
+
+ # Check mysql
+ res = geoipinfo_manager.get_mysql(ip)
+ assert res.ip == ip
+ assert res.lookup_prefix == "/64"
+ assert res.basic
+ with pytest.raises(AssertionError):
+ geoipinfo_manager.get_mysql_raw(ip)
+ res = geoipinfo_manager.get_mysql_raw(normalized_ip)
+ assert res["ip"] == normalized_ip
+
+ def test_create_insights(
+ self,
+ maxmind_manager: MaxmindManager,
+ geoipinfo_manager: GeoIpInfoManager,
+ delete_ipinfo,
+ ):
+ # This is (currently) an IP in the US, so it should do insights
+ ip = IP_v4_US
+ delete_ipinfo(ip)
+ geoipinfo_manager.clear_cache(ip)
+ assert geoipinfo_manager.get_cache(ip) is None
+ assert geoipinfo_manager.get_mysql_if_exists(ip) is None
+
+ res1 = maxmind_manager.run_ip_information(ip, force_insights=False)
+ assert isinstance(res1, GeoIPInformation)
+
+ # Check that it is in the cache and in mysql
+ res2 = geoipinfo_manager.get_cache(ip)
+ assert isinstance(res2, GeoIPInformation)
+ assert res2.ip == ip
+ assert not res2.basic
+
+ res3 = geoipinfo_manager.get_mysql(ip)
+ assert isinstance(res3, GeoIPInformation)
+ assert res3.ip == ip
+ assert not res3.basic
+ assert res3.is_anonymous is False
+ assert res3.subdivision_1_name.lower() in US_STATES
+ # this might change ...
+ assert res3.user_type == UserType.CELLULAR
+
+ assert res1 == res2 == res3, "runner, cache, mysql all return same instance"
+
+ def test_create_insights_ipv6(
+ self,
+ maxmind_manager: MaxmindManager,
+ geoipinfo_manager: GeoIpInfoManager,
+ delete_ipinfo,
+ ):
+ # This is (currently) an IP in the US, so it should do insights
+ ip = IP_v6_US
+ normalized_ip, lookup_prefix = normalize_ip(ip)
+ delete_ipinfo(ip)
+ geoipinfo_manager.clear_cache(ip)
+ delete_ipinfo(normalized_ip)
+ geoipinfo_manager.clear_cache(normalized_ip)
+ assert geoipinfo_manager.get_cache(ip) is None
+ assert geoipinfo_manager.get_cache(normalized_ip) is None
+ assert geoipinfo_manager.get_mysql_if_exists(ip) is None
+ assert geoipinfo_manager.get_mysql_if_exists(normalized_ip) is None
+
+ res1 = maxmind_manager.run_ip_information(ip, force_insights=False)
+ assert isinstance(res1, GeoIPInformation)
+ assert res1.lookup_prefix == "/64"
+
+ # Check that it is in the cache and in mysql
+ res2 = geoipinfo_manager.get_cache(ip)
+ assert isinstance(res2, GeoIPInformation)
+ assert res2.ip == ip
+ assert not res2.basic
+
+ res3 = geoipinfo_manager.get_mysql(ip)
+ assert isinstance(res3, GeoIPInformation)
+ assert res3.ip == ip
+ assert not res3.basic
+ assert res3.is_anonymous is False
+ assert res3.subdivision_1_name.lower() in US_STATES
+ # this might change ...
+ assert res3.user_type == UserType.RESIDENTIAL
+
+ assert res1 == res2 == res3, "runner, cache, mysql all return same instance"
+
+ def test_get_or_create_ip_information(self, maxmind_manager):
+ ip = IP_v4_US
+
+ res1 = maxmind_manager.get_or_create_ip_information(ip_address=ip)
+ assert isinstance(res1, GeoIPInformation)
+
+ res2 = maxmind_manager.get_or_create_ip_information(
+ ip_address=fake.ipv4_private()
+ )
+ assert res2 is None
+
+ def test_get_or_create_ip_information_ipv6(
+ self, maxmind_manager, delete_ipinfo, geoipinfo_manager, caplog
+ ):
+ ip = IP_v6_US
+ normalized_ip, lookup_prefix = normalize_ip(ip)
+ delete_ipinfo(normalized_ip)
+ geoipinfo_manager.clear_cache(normalized_ip)
+
+ with caplog.at_level(logging.INFO):
+ res1 = maxmind_manager.get_or_create_ip_information(ip_address=ip)
+ assert isinstance(res1, GeoIPInformation)
+ assert res1.ip == ip
+ # It looks up in insight using the normalize IP!
+ assert f"get_insights_ip_information: {normalized_ip}" in caplog.text
+
+ # And it should NOT do the lookup again with an ipv6 in the same /64 block!
+ ip = IP_v6_US_SAME_64
+ caplog.clear()
+ with caplog.at_level(logging.INFO):
+ res2 = maxmind_manager.get_or_create_ip_information(ip_address=ip)
+ assert isinstance(res2, GeoIPInformation)
+ assert res2.ip == ip
+ assert "get_insights_ip_information" not in caplog.text
+
+ def test_run_ip_information(self, maxmind_manager):
+ ip = IP_v4_US
+
+ res = maxmind_manager.run_ip_information(ip_address=ip)
+ assert isinstance(res, GeoIPInformation)
+ assert res.country_name == "United States"
+ assert res.country_iso == "us"