From efe1c368b8b49a2c6b3bf2193a5b89eb5426eba3 Mon Sep 17 00:00:00 2001 From: stuppie Date: Fri, 13 Mar 2026 13:31:22 -0600 Subject: ToolRunCommand + options models to handle cmd line args for all network tools --- tests/managers/network/label.py | 202 ---------------------------------------- 1 file changed, 202 deletions(-) delete mode 100644 tests/managers/network/label.py (limited to 'tests/managers/network/label.py') diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py deleted file mode 100644 index 5b9a790..0000000 --- a/tests/managers/network/label.py +++ /dev/null @@ -1,202 +0,0 @@ -import ipaddress - -import faker -import pytest -from psycopg.errors import UniqueViolation -from pydantic import ValidationError - -from generalresearch.managers.network.label import IPLabelManager -from generalresearch.models.network.label import ( - IPLabel, - IPLabelKind, - IPLabelSource, - IPLabelMetadata, -) -from generalresearch.models.thl.ipinfo import normalize_ip - -fake = faker.Faker() - - -@pytest.fixture -def ip_label(utc_now) -> IPLabel: - ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - return IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - metadata=IPLabelMetadata(services=["RDP"]) - ) - - -def test_model(utc_now): - ip = fake.ipv4_public() - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - assert lbl.ip.prefixlen == 32 - print(f"{lbl.ip=}") - - ip = ipaddress.IPv4Network((ip, 24), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"): - IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=fake.ipv6(), - ) - - ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - -def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel): - iplabel_manager.create(ip_label) - - with pytest.raises( - UniqueViolation, match="duplicate key value violates unique constraint" - ): - iplabel_manager.create(ip_label) - - -def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago): - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 0 - - iplabel_manager.create(ip_label) - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 1 - - out = res[0] - assert out == ip_label - - res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) - assert len(res) == 1 - - ip_label2 = ip_label.model_copy() - ip_label2.ip = fake.ipv4_public() - iplabel_manager.create(ip_label2) - res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) - assert len(res) == 2 - - -def test_filter_network( - iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago -): - print(ip_label) - ip_label = ip_label.model_copy() - ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - - iplabel_manager.create(ip_label) - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 1 - - out = res[0] - assert out == ip_label - - res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) - assert len(res) == 1 - - ip_label2 = ip_label.model_copy() - ip_label2.ip = fake.ipv4_public() - iplabel_manager.create(ip_label2) - res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) - assert len(res) == 2 - - -def test_network(iplabel_manager: IPLabelManager, utc_now): - # This is a fully-specific /128 ipv6 address. - # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d' - ip = fake.ipv6() - # Generally, we'd want to annotate the /64 network - # e.g. '51b7:b38d:8717:6c5b::/64' - ip_64 = ipaddress.IPv6Network((ip, 64), strict=False) - - label = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip_64, - ) - iplabel_manager.create(label) - - # If I query for the /128 directly, I won't find it - res = iplabel_manager.filter(ips=[ip]) - assert len(res) == 0 - - # If I query for the /64 network I will - res = iplabel_manager.filter(ips=[ip_64]) - assert len(res) == 1 - - # Or, I can query for the /128 ip IN a network - res = iplabel_manager.filter(ip_in_network=ip) - assert len(res) == 1 - - -def test_label_cidr_and_ipinfo( - iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now -): - # We have network_iplabel.ip as a cidr col and - # thl_ipinformation.ip as a inet col. Make sure we can join appropriately - ip = fake.ipv6() - ip_information_factory(ip=ip, geoname=ip_geoname) - # We normalize for storage into ipinfo table - ip_norm, prefix = normalize_ip(ip) - - # Test with a larger network - ip_48 = ipaddress.IPv6Network((ip, 48), strict=False) - print(f"{ip=}") - print(f"{ip_norm=}") - print(f"{ip_48=}") - label = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip_48, - ) - iplabel_manager.create(label) - - res = iplabel_manager.test_join(ip_norm) - print(res) -- cgit v1.2.3