aboutsummaryrefslogtreecommitdiff
path: root/tests/managers
diff options
context:
space:
mode:
authorstuppie2026-03-09 18:42:22 -0600
committerstuppie2026-03-09 18:42:22 -0600
commita68a9eb9873c7502c2b7bddb55c4eb61689a48a2 (patch)
tree6a4df8d399e94532dd3938f53f467913f412ce10 /tests/managers
parentcf274a5814e02aefc0c9e25f96927b3163a4abaf (diff)
downloadgeneralresearch-a68a9eb9873c7502c2b7bddb55c4eb61689a48a2.tar.gz
generalresearch-a68a9eb9873c7502c2b7bddb55c4eb61689a48a2.zip
add IPLabel, NmapRun, RDNSResult, ToolRun, model/managers/tests. nmap xml parser. + test. work in progress
Diffstat (limited to 'tests/managers')
-rw-r--r--tests/managers/network/__init__.py0
-rw-r--r--tests/managers/network/label.py202
-rw-r--r--tests/managers/network/tool_run.py48
3 files changed, 250 insertions, 0 deletions
diff --git a/tests/managers/network/__init__.py b/tests/managers/network/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/managers/network/__init__.py
diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py
new file mode 100644
index 0000000..5b9a790
--- /dev/null
+++ b/tests/managers/network/label.py
@@ -0,0 +1,202 @@
+import ipaddress
+
+import faker
+import pytest
+from psycopg.errors import UniqueViolation
+from pydantic import ValidationError
+
+from generalresearch.managers.network.label import IPLabelManager
+from generalresearch.models.network.label import (
+ IPLabel,
+ IPLabelKind,
+ IPLabelSource,
+ IPLabelMetadata,
+)
+from generalresearch.models.thl.ipinfo import normalize_ip
+
+fake = faker.Faker()
+
+
+@pytest.fixture
+def ip_label(utc_now) -> IPLabel:
+ ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+ return IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ metadata=IPLabelMetadata(services=["RDP"])
+ )
+
+
+def test_model(utc_now):
+ ip = fake.ipv4_public()
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ assert lbl.ip.prefixlen == 32
+ print(f"{lbl.ip=}")
+
+ ip = ipaddress.IPv4Network((ip, 24), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+ with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"):
+ IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=fake.ipv6(),
+ )
+
+ ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+ ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+
+def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel):
+ iplabel_manager.create(ip_label)
+
+ with pytest.raises(
+ UniqueViolation, match="duplicate key value violates unique constraint"
+ ):
+ iplabel_manager.create(ip_label)
+
+
+def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago):
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 0
+
+ iplabel_manager.create(ip_label)
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 1
+
+ out = res[0]
+ assert out == ip_label
+
+ res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago)
+ assert len(res) == 1
+
+ ip_label2 = ip_label.model_copy()
+ ip_label2.ip = fake.ipv4_public()
+ iplabel_manager.create(ip_label2)
+ res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip])
+ assert len(res) == 2
+
+
+def test_filter_network(
+ iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago
+):
+ print(ip_label)
+ ip_label = ip_label.model_copy()
+ ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+
+ iplabel_manager.create(ip_label)
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 1
+
+ out = res[0]
+ assert out == ip_label
+
+ res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago)
+ assert len(res) == 1
+
+ ip_label2 = ip_label.model_copy()
+ ip_label2.ip = fake.ipv4_public()
+ iplabel_manager.create(ip_label2)
+ res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip])
+ assert len(res) == 2
+
+
+def test_network(iplabel_manager: IPLabelManager, utc_now):
+ # This is a fully-specific /128 ipv6 address.
+ # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d'
+ ip = fake.ipv6()
+ # Generally, we'd want to annotate the /64 network
+ # e.g. '51b7:b38d:8717:6c5b::/64'
+ ip_64 = ipaddress.IPv6Network((ip, 64), strict=False)
+
+ label = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip_64,
+ )
+ iplabel_manager.create(label)
+
+ # If I query for the /128 directly, I won't find it
+ res = iplabel_manager.filter(ips=[ip])
+ assert len(res) == 0
+
+ # If I query for the /64 network I will
+ res = iplabel_manager.filter(ips=[ip_64])
+ assert len(res) == 1
+
+ # Or, I can query for the /128 ip IN a network
+ res = iplabel_manager.filter(ip_in_network=ip)
+ assert len(res) == 1
+
+
+def test_label_cidr_and_ipinfo(
+ iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now
+):
+ # We have network_iplabel.ip as a cidr col and
+ # thl_ipinformation.ip as a inet col. Make sure we can join appropriately
+ ip = fake.ipv6()
+ ip_information_factory(ip=ip, geoname=ip_geoname)
+ # We normalize for storage into ipinfo table
+ ip_norm, prefix = normalize_ip(ip)
+
+ # Test with a larger network
+ ip_48 = ipaddress.IPv6Network((ip, 48), strict=False)
+ print(f"{ip=}")
+ print(f"{ip_norm=}")
+ print(f"{ip_48=}")
+ label = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip_48,
+ )
+ iplabel_manager.create(label)
+
+ res = iplabel_manager.test_join(ip_norm)
+ print(res)
diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py
new file mode 100644
index 0000000..a598a71
--- /dev/null
+++ b/tests/managers/network/tool_run.py
@@ -0,0 +1,48 @@
+from uuid import uuid4
+
+import faker
+
+from generalresearch.models.network.tool_run import (
+ new_tool_run_from_nmap,
+ run_dig,
+)
+fake = faker.Faker()
+
+
+def test_create_tool_run_from_nmap(nmap_run, toolrun_manager):
+ scan_group_id = uuid4().hex
+ run = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id)
+
+ toolrun_manager.create_portscan_run(run)
+
+ run_out = toolrun_manager.get_portscan_run(run.id)
+
+ assert run == run_out
+
+
+def test_create_tool_run_from_dig_fixture(reverse_dns_run, toolrun_manager):
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
+
+
+def test_run_dig(toolrun_manager):
+ reverse_dns_run = run_dig(ip="65.19.129.53")
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
+
+def test_run_dig_empty(toolrun_manager):
+ reverse_dns_run = run_dig(ip=fake.ipv6())
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out \ No newline at end of file