1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
import json
import logging
from typing import Callable
import geoip2.models
import pytest
from faker import Faker
from faker.providers.address.en_US import Provider as USAddressProvider
from generalresearch.managers.thl.ipinfo import GeoIpInfoManager
from generalresearch.managers.thl.maxmind import MaxmindManager
from generalresearch.managers.thl.maxmind.basic import (
MaxmindBasicManager,
)
from generalresearch.models.thl.ipinfo import (
GeoIPInformation,
normalize_ip,
)
from generalresearch.models.thl.maxmind.definitions import UserType
fake = Faker()
US_STATES = {x.lower() for x in USAddressProvider.states}
IP_v4_INDIA = "106.203.146.157"
IP_v6_INDIA = "2402:3a80:4649:de3f:0:24:74aa:b601"
IP_v4_US = "174.218.60.101"
IP_v6_US = "2600:1700:ece0:9410:55d:faf3:c15d:6e4"
IP_v6_US_SAME_64 = "2600:1700:ece0:9410:55d:faf3:c15d:aaaa"
@pytest.fixture(scope="session")
def delete_ipinfo(thl_web_rw) -> Callable:
def _delete_ipinfo(ip):
thl_web_rw.execute_write(
query="DELETE FROM thl_geoname WHERE geoname_id IN (SELECT geoname_id FROM thl_ipinformation WHERE ip = %s);",
params=[ip],
)
thl_web_rw.execute_write(
query="DELETE FROM thl_ipinformation WHERE ip = %s;",
params=[ip],
)
return _delete_ipinfo
class TestMaxmindBasicManager:
def test_init(self, maxmind_basic_manager):
assert isinstance(maxmind_basic_manager, MaxmindBasicManager)
def test_get_basic_ip_information(self, maxmind_basic_manager):
ip = IP_v4_INDIA
maxmind_basic_manager.run_update_geoip_db()
res1 = maxmind_basic_manager.get_basic_ip_information(ip_address=ip)
assert isinstance(res1, geoip2.models.Country)
assert res1.country.iso_code == "IN"
assert res1.country.name == "India"
res2 = maxmind_basic_manager.get_basic_ip_information(
ip_address=fake.ipv4_private()
)
assert res2 is None
def test_get_country_iso_from_ip_geoip2db(self, maxmind_basic_manager):
ip = IP_v4_INDIA
maxmind_basic_manager.run_update_geoip_db()
res1 = maxmind_basic_manager.get_country_iso_from_ip_geoip2db(ip=ip)
assert res1 == "in"
res2 = maxmind_basic_manager.get_country_iso_from_ip_geoip2db(
ip=fake.ipv4_private()
)
assert res2 is None
def test_get_basic_ip_information_ipv6(self, maxmind_basic_manager):
ip = IP_v6_INDIA
maxmind_basic_manager.run_update_geoip_db()
res1 = maxmind_basic_manager.get_basic_ip_information(ip_address=ip)
assert isinstance(res1, geoip2.models.Country)
assert res1.country.iso_code == "IN"
assert res1.country.name == "India"
class TestMaxmindManager:
def test_init(self, thl_web_rr, thl_redis_config, maxmind_manager: MaxmindManager):
instance = MaxmindManager(pg_config=thl_web_rr, redis_config=thl_redis_config)
assert isinstance(instance, MaxmindManager)
assert isinstance(maxmind_manager, MaxmindManager)
def test_create_basic(
self,
maxmind_manager: MaxmindManager,
geoipinfo_manager: GeoIpInfoManager,
delete_ipinfo,
):
# This is (currently) an IP in India, and so it should only do the basic lookup
ip = IP_v4_INDIA
delete_ipinfo(ip)
geoipinfo_manager.clear_cache(ip)
assert geoipinfo_manager.get_cache(ip) is None
assert geoipinfo_manager.get_mysql_if_exists(ip) is None
maxmind_manager.run_ip_information(ip, force_insights=False)
# Check that it is in the cache and in mysql
res = geoipinfo_manager.get_cache(ip)
assert res.ip == ip
assert res.basic
res = geoipinfo_manager.get_mysql(ip)
assert res.ip == ip
assert res.basic
def test_create_basic_ipv6(
self,
maxmind_manager: MaxmindManager,
geoipinfo_manager: GeoIpInfoManager,
delete_ipinfo,
):
# This is (currently) an IP in India, and so it should only do the basic lookup
ip = IP_v6_INDIA
normalized_ip, lookup_prefix = normalize_ip(ip)
delete_ipinfo(ip)
geoipinfo_manager.clear_cache(ip)
delete_ipinfo(normalized_ip)
geoipinfo_manager.clear_cache(normalized_ip)
assert geoipinfo_manager.get_cache(ip) is None
assert geoipinfo_manager.get_cache(normalized_ip) is None
assert geoipinfo_manager.get_mysql_if_exists(ip) is None
assert geoipinfo_manager.get_mysql_if_exists(normalized_ip) is None
maxmind_manager.run_ip_information(ip, force_insights=False)
# Check that it is in the cache
res = geoipinfo_manager.get_cache(ip)
# The looked up IP (/128) is returned,
assert res.ip == ip
assert res.lookup_prefix == "/64"
assert res.basic
# ... but the normalized version was stored (/64)
assert geoipinfo_manager.get_cache_raw(ip) is None
res = json.loads(geoipinfo_manager.get_cache_raw(normalized_ip))
assert res["ip"] == normalized_ip
# Check mysql
res = geoipinfo_manager.get_mysql(ip)
assert res.ip == ip
assert res.lookup_prefix == "/64"
assert res.basic
with pytest.raises(AssertionError):
geoipinfo_manager.get_mysql_raw(ip)
res = geoipinfo_manager.get_mysql_raw(normalized_ip)
assert res["ip"] == normalized_ip
def test_create_insights(
self,
maxmind_manager: MaxmindManager,
geoipinfo_manager: GeoIpInfoManager,
delete_ipinfo,
):
# This is (currently) an IP in the US, so it should do insights
ip = IP_v4_US
delete_ipinfo(ip)
geoipinfo_manager.clear_cache(ip)
assert geoipinfo_manager.get_cache(ip) is None
assert geoipinfo_manager.get_mysql_if_exists(ip) is None
res1 = maxmind_manager.run_ip_information(ip, force_insights=False)
assert isinstance(res1, GeoIPInformation)
# Check that it is in the cache and in mysql
res2 = geoipinfo_manager.get_cache(ip)
assert isinstance(res2, GeoIPInformation)
assert res2.ip == ip
assert not res2.basic
res3 = geoipinfo_manager.get_mysql(ip)
assert isinstance(res3, GeoIPInformation)
assert res3.ip == ip
assert not res3.basic
assert res3.is_anonymous is False
assert res3.subdivision_1_name.lower() in US_STATES
# this might change ...
assert res3.user_type == UserType.CELLULAR
assert res1 == res2 == res3, "runner, cache, mysql all return same instance"
def test_create_insights_ipv6(
self,
maxmind_manager: MaxmindManager,
geoipinfo_manager: GeoIpInfoManager,
delete_ipinfo,
):
# This is (currently) an IP in the US, so it should do insights
ip = IP_v6_US
normalized_ip, lookup_prefix = normalize_ip(ip)
delete_ipinfo(ip)
geoipinfo_manager.clear_cache(ip)
delete_ipinfo(normalized_ip)
geoipinfo_manager.clear_cache(normalized_ip)
assert geoipinfo_manager.get_cache(ip) is None
assert geoipinfo_manager.get_cache(normalized_ip) is None
assert geoipinfo_manager.get_mysql_if_exists(ip) is None
assert geoipinfo_manager.get_mysql_if_exists(normalized_ip) is None
res1 = maxmind_manager.run_ip_information(ip, force_insights=False)
assert isinstance(res1, GeoIPInformation)
assert res1.lookup_prefix == "/64"
# Check that it is in the cache and in mysql
res2 = geoipinfo_manager.get_cache(ip)
assert isinstance(res2, GeoIPInformation)
assert res2.ip == ip
assert not res2.basic
res3 = geoipinfo_manager.get_mysql(ip)
assert isinstance(res3, GeoIPInformation)
assert res3.ip == ip
assert not res3.basic
assert res3.is_anonymous is False
assert res3.subdivision_1_name.lower() in US_STATES
# this might change ...
assert res3.user_type == UserType.RESIDENTIAL
assert res1 == res2 == res3, "runner, cache, mysql all return same instance"
def test_get_or_create_ip_information(self, maxmind_manager):
ip = IP_v4_US
res1 = maxmind_manager.get_or_create_ip_information(ip_address=ip)
assert isinstance(res1, GeoIPInformation)
res2 = maxmind_manager.get_or_create_ip_information(
ip_address=fake.ipv4_private()
)
assert res2 is None
def test_get_or_create_ip_information_ipv6(
self, maxmind_manager, delete_ipinfo, geoipinfo_manager, caplog
):
ip = IP_v6_US
normalized_ip, lookup_prefix = normalize_ip(ip)
delete_ipinfo(normalized_ip)
geoipinfo_manager.clear_cache(normalized_ip)
with caplog.at_level(logging.INFO):
res1 = maxmind_manager.get_or_create_ip_information(ip_address=ip)
assert isinstance(res1, GeoIPInformation)
assert res1.ip == ip
# It looks up in insight using the normalize IP!
assert f"get_insights_ip_information: {normalized_ip}" in caplog.text
# And it should NOT do the lookup again with an ipv6 in the same /64 block!
ip = IP_v6_US_SAME_64
caplog.clear()
with caplog.at_level(logging.INFO):
res2 = maxmind_manager.get_or_create_ip_information(ip_address=ip)
assert isinstance(res2, GeoIPInformation)
assert res2.ip == ip
assert "get_insights_ip_information" not in caplog.text
def test_run_ip_information(self, maxmind_manager):
ip = IP_v4_US
res = maxmind_manager.run_ip_information(ip_address=ip)
assert isinstance(res, GeoIPInformation)
assert res.country_name == "United States"
assert res.country_iso == "us"
|