From a68a9eb9873c7502c2b7bddb55c4eb61689a48a2 Mon Sep 17 00:00:00 2001
From: stuppie
Date: Mon, 9 Mar 2026 18:42:22 -0600
Subject: add IPLabel, NmapRun, RDNSResult, ToolRun, model/managers/tests. nmap
xml parser. + test. work in progress
---
tests/conftest.py | 1 +
tests/data/nmaprun1.xml | 68 +++++++++++++
tests/data/nmaprun2.xml | 118 ++++++++++++++++++++++
tests/managers/network/__init__.py | 0
tests/managers/network/label.py | 202 +++++++++++++++++++++++++++++++++++++
tests/managers/network/tool_run.py | 48 +++++++++
tests/models/network/__init__.py | 0
tests/models/network/nmap.py | 32 ++++++
tests/models/network/rdns.py | 23 +++++
tests/models/network/tool_run.py | 8 ++
10 files changed, 500 insertions(+)
create mode 100644 tests/data/nmaprun1.xml
create mode 100644 tests/data/nmaprun2.xml
create mode 100644 tests/managers/network/__init__.py
create mode 100644 tests/managers/network/label.py
create mode 100644 tests/managers/network/tool_run.py
create mode 100644 tests/models/network/__init__.py
create mode 100644 tests/models/network/nmap.py
create mode 100644 tests/models/network/rdns.py
create mode 100644 tests/models/network/tool_run.py
(limited to 'tests')
diff --git a/tests/conftest.py b/tests/conftest.py
index 30ed1c7..2482269 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -13,6 +13,7 @@ pytest_plugins = [
"test_utils.managers.conftest",
"test_utils.managers.contest.conftest",
"test_utils.managers.ledger.conftest",
+ "test_utils.managers.network.conftest",
"test_utils.managers.upk.conftest",
# -- Models
"test_utils.models.conftest",
diff --git a/tests/data/nmaprun1.xml b/tests/data/nmaprun1.xml
new file mode 100644
index 0000000..c5fed6c
--- /dev/null
+++ b/tests/data/nmaprun1.xml
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/data/nmaprun2.xml b/tests/data/nmaprun2.xml
new file mode 100644
index 0000000..932896c
--- /dev/null
+++ b/tests/data/nmaprun2.xml
@@ -0,0 +1,118 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ cpe:/a:openbsd:openssh:6.6.1p1
+ cpe:/o:linux:linux_kernel
+
+
+
+
+
+
+
+
+
+
+ cpe:/a:apache:http_server:2.4.7
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ cpe:/o:linux:linux_kernel:2.6
+
+
+ cpe:/o:linux:linux_kernel:3
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/managers/network/__init__.py b/tests/managers/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py
new file mode 100644
index 0000000..5b9a790
--- /dev/null
+++ b/tests/managers/network/label.py
@@ -0,0 +1,202 @@
+import ipaddress
+
+import faker
+import pytest
+from psycopg.errors import UniqueViolation
+from pydantic import ValidationError
+
+from generalresearch.managers.network.label import IPLabelManager
+from generalresearch.models.network.label import (
+ IPLabel,
+ IPLabelKind,
+ IPLabelSource,
+ IPLabelMetadata,
+)
+from generalresearch.models.thl.ipinfo import normalize_ip
+
+fake = faker.Faker()
+
+
+@pytest.fixture
+def ip_label(utc_now) -> IPLabel:
+ ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+ return IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ metadata=IPLabelMetadata(services=["RDP"])
+ )
+
+
+def test_model(utc_now):
+ ip = fake.ipv4_public()
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ assert lbl.ip.prefixlen == 32
+ print(f"{lbl.ip=}")
+
+ ip = ipaddress.IPv4Network((ip, 24), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+ with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"):
+ IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=fake.ipv6(),
+ )
+
+ ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+ ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+
+def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel):
+ iplabel_manager.create(ip_label)
+
+ with pytest.raises(
+ UniqueViolation, match="duplicate key value violates unique constraint"
+ ):
+ iplabel_manager.create(ip_label)
+
+
+def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago):
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 0
+
+ iplabel_manager.create(ip_label)
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 1
+
+ out = res[0]
+ assert out == ip_label
+
+ res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago)
+ assert len(res) == 1
+
+ ip_label2 = ip_label.model_copy()
+ ip_label2.ip = fake.ipv4_public()
+ iplabel_manager.create(ip_label2)
+ res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip])
+ assert len(res) == 2
+
+
+def test_filter_network(
+ iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago
+):
+ print(ip_label)
+ ip_label = ip_label.model_copy()
+ ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+
+ iplabel_manager.create(ip_label)
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 1
+
+ out = res[0]
+ assert out == ip_label
+
+ res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago)
+ assert len(res) == 1
+
+ ip_label2 = ip_label.model_copy()
+ ip_label2.ip = fake.ipv4_public()
+ iplabel_manager.create(ip_label2)
+ res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip])
+ assert len(res) == 2
+
+
+def test_network(iplabel_manager: IPLabelManager, utc_now):
+ # This is a fully-specific /128 ipv6 address.
+ # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d'
+ ip = fake.ipv6()
+ # Generally, we'd want to annotate the /64 network
+ # e.g. '51b7:b38d:8717:6c5b::/64'
+ ip_64 = ipaddress.IPv6Network((ip, 64), strict=False)
+
+ label = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip_64,
+ )
+ iplabel_manager.create(label)
+
+ # If I query for the /128 directly, I won't find it
+ res = iplabel_manager.filter(ips=[ip])
+ assert len(res) == 0
+
+ # If I query for the /64 network I will
+ res = iplabel_manager.filter(ips=[ip_64])
+ assert len(res) == 1
+
+ # Or, I can query for the /128 ip IN a network
+ res = iplabel_manager.filter(ip_in_network=ip)
+ assert len(res) == 1
+
+
+def test_label_cidr_and_ipinfo(
+ iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now
+):
+ # We have network_iplabel.ip as a cidr col and
+ # thl_ipinformation.ip as a inet col. Make sure we can join appropriately
+ ip = fake.ipv6()
+ ip_information_factory(ip=ip, geoname=ip_geoname)
+ # We normalize for storage into ipinfo table
+ ip_norm, prefix = normalize_ip(ip)
+
+ # Test with a larger network
+ ip_48 = ipaddress.IPv6Network((ip, 48), strict=False)
+ print(f"{ip=}")
+ print(f"{ip_norm=}")
+ print(f"{ip_48=}")
+ label = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip_48,
+ )
+ iplabel_manager.create(label)
+
+ res = iplabel_manager.test_join(ip_norm)
+ print(res)
diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py
new file mode 100644
index 0000000..a598a71
--- /dev/null
+++ b/tests/managers/network/tool_run.py
@@ -0,0 +1,48 @@
+from uuid import uuid4
+
+import faker
+
+from generalresearch.models.network.tool_run import (
+ new_tool_run_from_nmap,
+ run_dig,
+)
+fake = faker.Faker()
+
+
+def test_create_tool_run_from_nmap(nmap_run, toolrun_manager):
+ scan_group_id = uuid4().hex
+ run = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id)
+
+ toolrun_manager.create_portscan_run(run)
+
+ run_out = toolrun_manager.get_portscan_run(run.id)
+
+ assert run == run_out
+
+
+def test_create_tool_run_from_dig_fixture(reverse_dns_run, toolrun_manager):
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
+
+
+def test_run_dig(toolrun_manager):
+ reverse_dns_run = run_dig(ip="65.19.129.53")
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
+
+def test_run_dig_empty(toolrun_manager):
+ reverse_dns_run = run_dig(ip=fake.ipv6())
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
\ No newline at end of file
diff --git a/tests/models/network/__init__.py b/tests/models/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py
new file mode 100644
index 0000000..4fc7014
--- /dev/null
+++ b/tests/models/network/nmap.py
@@ -0,0 +1,32 @@
+import os
+
+import pytest
+
+from generalresearch.models.network.xml_parser import NmapXmlParser
+
+
+@pytest.fixture
+def nmap_xml_str(request) -> str:
+ fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml")
+ with open(fp, "r") as f:
+ data = f.read()
+ return data
+
+
+@pytest.fixture
+def nmap_xml_str2(request) -> str:
+ fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml")
+ with open(fp, "r") as f:
+ data = f.read()
+ return data
+
+
+def test_nmap_xml_parser(nmap_xml_str, nmap_xml_str2):
+ p = NmapXmlParser()
+ n = p.parse_xml(nmap_xml_str)
+ assert n.tcp_open_ports == [61232]
+ assert len(n.trace.hops) == 18
+
+ n = p.parse_xml(nmap_xml_str2)
+ assert n.tcp_open_ports == [22, 80, 9929, 31337]
+ assert n.trace is None
diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py
new file mode 100644
index 0000000..9167749
--- /dev/null
+++ b/tests/models/network/rdns.py
@@ -0,0 +1,23 @@
+from generalresearch.models.network.rdns import dig_rdns
+import faker
+
+fake = faker.Faker()
+
+
+def test_dig_rdns():
+ # Actually runs dig -x. Idk how stable this is
+ ip = "45.33.32.156"
+ rdns_result = dig_rdns(ip)
+ assert rdns_result.primary_hostname == "scanme.nmap.org"
+ assert rdns_result.primary_org == "nmap"
+
+ ip = "65.19.129.53"
+ rdns_result = dig_rdns(ip)
+ assert rdns_result.primary_hostname == "in1-smtp.grlengine.com"
+ assert rdns_result.primary_org == "grlengine"
+
+ ip = fake.ipv6()
+ rdns_result = dig_rdns(ip)
+ assert rdns_result.primary_hostname is None
+ assert rdns_result.primary_org is None
+ print(rdns_result.model_dump_postgres())
diff --git a/tests/models/network/tool_run.py b/tests/models/network/tool_run.py
new file mode 100644
index 0000000..c643503
--- /dev/null
+++ b/tests/models/network/tool_run.py
@@ -0,0 +1,8 @@
+from uuid import uuid4
+
+from generalresearch.models.network.tool_run import new_tool_run_from_nmap
+
+
+def test_new_tool_run_from_nmap(nmap_run):
+ scan_group_id = uuid4().hex
+ run, scan = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id)
--
cgit v1.2.3