From a68a9eb9873c7502c2b7bddb55c4eb61689a48a2 Mon Sep 17 00:00:00 2001 From: stuppie Date: Mon, 9 Mar 2026 18:42:22 -0600 Subject: add IPLabel, NmapRun, RDNSResult, ToolRun, model/managers/tests. nmap xml parser. + test. work in progress --- tests/managers/network/__init__.py | 0 tests/managers/network/label.py | 202 +++++++++++++++++++++++++++++++++++++ tests/managers/network/tool_run.py | 48 +++++++++ 3 files changed, 250 insertions(+) create mode 100644 tests/managers/network/__init__.py create mode 100644 tests/managers/network/label.py create mode 100644 tests/managers/network/tool_run.py (limited to 'tests/managers/network') diff --git a/tests/managers/network/__init__.py b/tests/managers/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py new file mode 100644 index 0000000..5b9a790 --- /dev/null +++ b/tests/managers/network/label.py @@ -0,0 +1,202 @@ +import ipaddress + +import faker +import pytest +from psycopg.errors import UniqueViolation +from pydantic import ValidationError + +from generalresearch.managers.network.label import IPLabelManager +from generalresearch.models.network.label import ( + IPLabel, + IPLabelKind, + IPLabelSource, + IPLabelMetadata, +) +from generalresearch.models.thl.ipinfo import normalize_ip + +fake = faker.Faker() + + +@pytest.fixture +def ip_label(utc_now) -> IPLabel: + ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + return IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + metadata=IPLabelMetadata(services=["RDP"]) + ) + + +def test_model(utc_now): + ip = fake.ipv4_public() + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + assert lbl.ip.prefixlen == 32 + print(f"{lbl.ip=}") + + ip = ipaddress.IPv4Network((ip, 24), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"): + IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=fake.ipv6(), + ) + + ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + +def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel): + iplabel_manager.create(ip_label) + + with pytest.raises( + UniqueViolation, match="duplicate key value violates unique constraint" + ): + iplabel_manager.create(ip_label) + + +def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago): + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 0 + + iplabel_manager.create(ip_label) + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 1 + + out = res[0] + assert out == ip_label + + res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) + assert len(res) == 1 + + ip_label2 = ip_label.model_copy() + ip_label2.ip = fake.ipv4_public() + iplabel_manager.create(ip_label2) + res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) + assert len(res) == 2 + + +def test_filter_network( + iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago +): + print(ip_label) + ip_label = ip_label.model_copy() + ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + + iplabel_manager.create(ip_label) + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 1 + + out = res[0] + assert out == ip_label + + res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) + assert len(res) == 1 + + ip_label2 = ip_label.model_copy() + ip_label2.ip = fake.ipv4_public() + iplabel_manager.create(ip_label2) + res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) + assert len(res) == 2 + + +def test_network(iplabel_manager: IPLabelManager, utc_now): + # This is a fully-specific /128 ipv6 address. + # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d' + ip = fake.ipv6() + # Generally, we'd want to annotate the /64 network + # e.g. '51b7:b38d:8717:6c5b::/64' + ip_64 = ipaddress.IPv6Network((ip, 64), strict=False) + + label = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip_64, + ) + iplabel_manager.create(label) + + # If I query for the /128 directly, I won't find it + res = iplabel_manager.filter(ips=[ip]) + assert len(res) == 0 + + # If I query for the /64 network I will + res = iplabel_manager.filter(ips=[ip_64]) + assert len(res) == 1 + + # Or, I can query for the /128 ip IN a network + res = iplabel_manager.filter(ip_in_network=ip) + assert len(res) == 1 + + +def test_label_cidr_and_ipinfo( + iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now +): + # We have network_iplabel.ip as a cidr col and + # thl_ipinformation.ip as a inet col. Make sure we can join appropriately + ip = fake.ipv6() + ip_information_factory(ip=ip, geoname=ip_geoname) + # We normalize for storage into ipinfo table + ip_norm, prefix = normalize_ip(ip) + + # Test with a larger network + ip_48 = ipaddress.IPv6Network((ip, 48), strict=False) + print(f"{ip=}") + print(f"{ip_norm=}") + print(f"{ip_48=}") + label = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip_48, + ) + iplabel_manager.create(label) + + res = iplabel_manager.test_join(ip_norm) + print(res) diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py new file mode 100644 index 0000000..a598a71 --- /dev/null +++ b/tests/managers/network/tool_run.py @@ -0,0 +1,48 @@ +from uuid import uuid4 + +import faker + +from generalresearch.models.network.tool_run import ( + new_tool_run_from_nmap, + run_dig, +) +fake = faker.Faker() + + +def test_create_tool_run_from_nmap(nmap_run, toolrun_manager): + scan_group_id = uuid4().hex + run = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id) + + toolrun_manager.create_portscan_run(run) + + run_out = toolrun_manager.get_portscan_run(run.id) + + assert run == run_out + + +def test_create_tool_run_from_dig_fixture(reverse_dns_run, toolrun_manager): + + toolrun_manager.create_rdns_run(reverse_dns_run) + + run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) + + assert reverse_dns_run == run_out + + +def test_run_dig(toolrun_manager): + reverse_dns_run = run_dig(ip="65.19.129.53") + + toolrun_manager.create_rdns_run(reverse_dns_run) + + run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) + + assert reverse_dns_run == run_out + +def test_run_dig_empty(toolrun_manager): + reverse_dns_run = run_dig(ip=fake.ipv6()) + + toolrun_manager.create_rdns_run(reverse_dns_run) + + run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) + + assert reverse_dns_run == run_out \ No newline at end of file -- cgit v1.2.3 From 06653612b730d85509d264d28d136857b6a9bbe0 Mon Sep 17 00:00:00 2001 From: stuppie Date: Tue, 10 Mar 2026 17:26:27 -0600 Subject: mtr manager and tool run + test --- generalresearch/managers/network/mtr.py | 47 ++++++ generalresearch/managers/network/tool_run.py | 29 +++- generalresearch/models/network/mtr.py | 70 ++++++--- generalresearch/models/network/rdns.py | 2 +- generalresearch/models/network/tool_run.py | 42 ++++++ generalresearch/thl_django/network/models.py | 13 +- test_utils/conftest.py | 1 + tests/data/mtr_fatbeam.json | 206 +++++++++++++++++++++++++++ tests/managers/network/tool_run.py | 63 +++++++- 9 files changed, 441 insertions(+), 32 deletions(-) create mode 100644 generalresearch/managers/network/mtr.py create mode 100644 tests/data/mtr_fatbeam.json (limited to 'tests/managers/network') diff --git a/generalresearch/managers/network/mtr.py b/generalresearch/managers/network/mtr.py new file mode 100644 index 0000000..496d3f0 --- /dev/null +++ b/generalresearch/managers/network/mtr.py @@ -0,0 +1,47 @@ +from typing import Optional + +from psycopg import Cursor + +from generalresearch.managers.base import PostgresManager +from generalresearch.models.network.tool_run import MtrRun + + +class MtrManager(PostgresManager): + + def _create(self, run: MtrRun, c: Optional[Cursor] = None) -> None: + """ + Do not use this directly. Must only be used in the context of a toolrun + """ + query = """ + INSERT INTO network_mtr ( + run_id, source_ip, facility_id, + protocol, port, parsed + ) + VALUES ( + %(run_id)s, %(source_ip)s, %(facility_id)s, + %(protocol)s, %(port)s, %(parsed)s + ); + """ + params = run.model_dump_postgres() + + query_hops = """ + INSERT INTO network_mtrhop ( + hop, ip, domain, asn, mtr_run_id + ) VALUES ( + %(hop)s, %(ip)s, %(domain)s, + %(asn)s, %(mtr_run_id)s + ) + """ + mtr_run = run.parsed + params_hops = [h.model_dump_postgres(run_id=run.id) for h in mtr_run.hops] + + if c: + c.execute(query, params) + if params_hops: + c.executemany(query_hops, params_hops) + else: + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, params) + if params_hops: + c.executemany(query_hops, params_hops) diff --git a/generalresearch/managers/network/tool_run.py b/generalresearch/managers/network/tool_run.py index 75c2e73..6280221 100644 --- a/generalresearch/managers/network/tool_run.py +++ b/generalresearch/managers/network/tool_run.py @@ -4,9 +4,15 @@ from psycopg import Cursor, sql from generalresearch.managers.base import PostgresManager, Permission from generalresearch.models.network.rdns import RDNSResult -from generalresearch.models.network.tool_run import ToolRun, PortScanRun, RDnsRun +from generalresearch.models.network.tool_run import ( + ToolRun, + PortScanRun, + RDnsRun, + MtrRun, +) from generalresearch.managers.network.nmap import NmapManager from generalresearch.managers.network.rdns import RdnsManager +from generalresearch.managers.network.mtr import MtrManager from generalresearch.pg_helper import PostgresConfig @@ -19,8 +25,9 @@ class ToolRunManager(PostgresManager): super().__init__(pg_config=pg_config, permissions=permissions) self.nmap_manager = NmapManager(self.pg_config) self.rdns_manager = RdnsManager(self.pg_config) + self.mtr_manager = MtrManager(self.pg_config) - def create_tool_run(self, run: PortScanRun | RDnsRun, c: Cursor): + def create_tool_run(self, run: PortScanRun | RDnsRun | MtrRun, c: Cursor): query = sql.SQL( """ INSERT INTO network_toolrun ( @@ -88,3 +95,21 @@ class ToolRunManager(PostgresManager): ) res["parsed"] = parsed return RDnsRun.model_validate(res) + + def create_mtr_run(self, run: MtrRun) -> MtrRun: + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + self.create_tool_run(run, c) + self.mtr_manager._create(run, c=c) + return run + + def get_mtr_run(self, id: int) -> MtrRun: + query = """ + SELECT tr.*, mtr.parsed, mtr.source_ip, mtr.facility_id + FROM network_toolrun tr + JOIN network_mtr mtr ON tr.id = mtr.run_id + WHERE id = %(id)s + """ + params = {"id": id} + res = self.pg_config.execute_sql_query(query, params)[0] + return MtrRun.model_validate(res) diff --git a/generalresearch/models/network/mtr.py b/generalresearch/models/network/mtr.py index 2e994d4..4b040de 100644 --- a/generalresearch/models/network/mtr.py +++ b/generalresearch/models/network/mtr.py @@ -6,7 +6,14 @@ from ipaddress import ip_address from typing import List, Optional, Dict import tldextract -from pydantic import Field, field_validator, BaseModel, ConfigDict, model_validator +from pydantic import ( + Field, + field_validator, + BaseModel, + ConfigDict, + model_validator, + computed_field, +) from generalresearch.models.network.definitions import IPProtocol, get_ip_kind, IPKind @@ -16,7 +23,7 @@ class MTRHop(BaseModel): hop: int = Field(alias="count") host: str - asn: Optional[str] = Field(default=None, alias="ASN") + asn: Optional[int] = Field(default=None, alias="ASN") loss_pct: float = Field(alias="Loss%") sent: int = Field(alias="Snt") @@ -27,15 +34,19 @@ class MTRHop(BaseModel): worst_ms: float = Field(alias="Wrst") stdev_ms: float = Field(alias="StDev") - hostname: Optional[str] = None + hostname: Optional[str] = Field( + default=None, examples=["fixed-187-191-8-145.totalplay.net"] + ) ip: Optional[str] = None - @field_validator("asn") + @field_validator("asn", mode="before") @classmethod - def normalize_asn(cls, v): - if v == "AS???": + def normalize_asn(cls, v: str): + if v is None or v == "AS???": return None - return v + if type(v) is int: + return v + return int(v.replace("AS", "")) @model_validator(mode="after") def parse_host(self): @@ -72,11 +83,27 @@ class MTRHop(BaseModel): return False return self.stdev_ms > self.avg_ms or self.worst_ms > self.best_ms * 10 + @computed_field(examples=["totalplay.net"]) @cached_property def domain(self) -> Optional[str]: if self.hostname: return tldextract.extract(self.hostname).top_domain_under_public_suffix + def model_dump_postgres(self, run_id: int): + # Writes for the network_mtrhop table + d = {"mtr_run_id": run_id} + data = self.model_dump( + mode="json", + include={ + "hop", + "ip", + "domain", + "asn", + }, + ) + d.update(data) + return d + class MTRReport(BaseModel): model_config = ConfigDict(populate_by_name=True) @@ -97,8 +124,20 @@ class MTRReport(BaseModel): hops: List[MTRHop] = Field() + def model_dump_postgres(self): + # Writes for the network_mtr table + d = self.model_dump( + mode="json", + include={"port"}, + ) + d["protocol"] = self.protocol.to_number() + d["parsed"] = self.model_dump_json(indent=0) + return d + def print_report(self) -> None: - print(f"MTR Report → {self.destination} {self.protocol.name} {self.port or ''}\n") + print( + f"MTR Report → {self.destination} {self.protocol.name} {self.port or ''}\n" + ) host_max_len = max(len(h.host) for h in self.hops) header = ( @@ -198,8 +237,8 @@ def run_mtr( ) raw = proc.stdout.strip() data = parse_raw_output(raw) - data['port'] = port - data['protocol'] = protocol + data["port"] = port + data["protocol"] = protocol return MTRReport.model_validate(data) @@ -208,14 +247,3 @@ def parse_raw_output(raw: str) -> Dict: data.update(data.pop("mtr")) data["hops"] = data.pop("hubs") return data - - -def load_example(): - s = open( - "/home/gstupp/projects/generalresearch/generalresearch/models/network/mtr_fatbeam.json", - "r", - ).read() - data = parse_raw_output(s) - data['port'] = 443 - data['protocol'] = IPProtocol.TCP - return MTRReport.model_validate(data) diff --git a/generalresearch/models/network/rdns.py b/generalresearch/models/network/rdns.py index ac63414..e00a32d 100644 --- a/generalresearch/models/network/rdns.py +++ b/generalresearch/models/network/rdns.py @@ -40,7 +40,7 @@ class RDNSResult(BaseModel): def hostname_count(self) -> int: return len(self.hostnames) - @computed_field(examples=["totalplay"]) + @computed_field(examples=["totalplay.net"]) @cached_property def primary_domain(self) -> Optional[str]: if self.primary_hostname: diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py index fba5dcb..2588890 100644 --- a/generalresearch/models/network/tool_run.py +++ b/generalresearch/models/network/tool_run.py @@ -17,6 +17,12 @@ from generalresearch.models.network.rdns import ( dig_rdns, get_dig_rdns_command, ) +from generalresearch.models.network.mtr import ( + MTRReport, + get_mtr_version, + run_mtr, + get_mtr_command, +) from generalresearch.models.network.tool_utils import ToolRunCommand @@ -90,6 +96,20 @@ class RDnsRun(ToolRun): return d +class MtrRun(ToolRun): + facility_id: int = Field(default=1) + source_ip: IPvAnyAddressStr = Field() + parsed: MTRReport = Field() + + def model_dump_postgres(self): + d = super().model_dump_postgres() + d["run_id"] = self.id + d["source_ip"] = self.source_ip + d["facility_id"] = self.facility_id + d.update(self.parsed.model_dump_postgres()) + return d + + def new_tool_run_from_nmap( nmap_run: NmapRun, scan_group_id: Optional[UUIDStr] = None ) -> PortScanRun: @@ -129,3 +149,25 @@ def run_dig(ip: str, scan_group_id: Optional[UUIDStr] = None) -> RDnsRun: config=ToolRunCommand.from_raw_command(raw_command), parsed=rdns_result, ) + + +def mtr_tool_run(ip: str, scan_group_id: Optional[UUIDStr] = None) -> MtrRun: + started_at = datetime.now(tz=timezone.utc) + tool_version = get_mtr_version() + result = run_mtr(ip) + finished_at = datetime.now(tz=timezone.utc) + raw_command = " ".join(get_mtr_command(ip)) + + return MtrRun( + tool_name=ToolName.MTR, + tool_class=ToolClass.TRACEROUTE, + tool_version=tool_version, + status=Status.SUCCESS, + ip=ip, + started_at=started_at, + finished_at=finished_at, + raw_command=raw_command, + scan_group_id=scan_group_id or uuid4().hex, + config=ToolRunCommand.from_raw_command(raw_command), + parsed=result, + ) diff --git a/generalresearch/thl_django/network/models.py b/generalresearch/thl_django/network/models.py index d50a7b1..7d4d8de 100644 --- a/generalresearch/thl_django/network/models.py +++ b/generalresearch/thl_django/network/models.py @@ -208,6 +208,9 @@ class MTR(models.Model): # nullable b/c ICMP doesn't use ports port = models.PositiveIntegerField(null=True) + # Full parsed output + parsed = models.JSONField() + class Meta: db_table = "network_mtr" @@ -219,9 +222,8 @@ class MTRHop(models.Model): related_name="hops", ) - hop_number = models.PositiveSmallIntegerField() - - responder_ip = models.GenericIPAddressField(null=True) + hop = models.PositiveSmallIntegerField() + ip = models.GenericIPAddressField(null=True) domain = models.CharField(max_length=50, null=True) asn = models.PositiveIntegerField(null=True) @@ -230,13 +232,12 @@ class MTRHop(models.Model): db_table = "network_mtrhop" constraints = [ models.UniqueConstraint( - fields=["mtr_run", "hop_number"], + fields=["mtr_run", "hop"], name="unique_hop_per_run", ) ] indexes = [ - models.Index(fields=["mtr_run", "hop_number"]), - models.Index(fields=["responder_ip"]), + models.Index(fields=["ip"]), models.Index(fields=["asn"]), models.Index(fields=["domain"]), ] diff --git a/test_utils/conftest.py b/test_utils/conftest.py index 54fb682..187ff58 100644 --- a/test_utils/conftest.py +++ b/test_utils/conftest.py @@ -38,6 +38,7 @@ def env_file_path(pytestconfig: Config) -> str: @pytest.fixture(scope="session") def settings(env_file_path: str) -> "GRLBaseSettings": from generalresearch.config import GRLBaseSettings + print(f"{env_file_path=}") s = GRLBaseSettings(_env_file=env_file_path) diff --git a/tests/data/mtr_fatbeam.json b/tests/data/mtr_fatbeam.json new file mode 100644 index 0000000..6e27eb1 --- /dev/null +++ b/tests/data/mtr_fatbeam.json @@ -0,0 +1,206 @@ +{ + "report": { + "mtr": { + "src": "gstupp-ThinkPad-X1-Carbon-Gen-11", + "dst": "167.150.6.80", + "tos": 0, + "tests": 10, + "psize": "64", + "bitpattern": "0x00" + }, + "hubs": [ + { + "count": 1, + "host": "_gateway (172.20.20.1)", + "ASN": "AS???", + "Loss%": 0.0, + "Snt": 10, + "Last": 2.408, + "Avg": 16.157, + "Best": 2.408, + "Wrst": 69.531, + "StDev": 20.69 + }, + { + "count": 2, + "host": "172.16.20.1", + "ASN": "AS???", + "Loss%": 0.0, + "Snt": 10, + "Last": 3.411, + "Avg": 16.906, + "Best": 2.613, + "Wrst": 90.7, + "StDev": 27.547 + }, + { + "count": 3, + "host": "192.168.1.254", + "ASN": "AS???", + "Loss%": 0.0, + "Snt": 10, + "Last": 17.012, + "Avg": 9.812, + "Best": 3.061, + "Wrst": 25.728, + "StDev": 8.908 + }, + { + "count": 4, + "host": "ipdsl-jal-ptovallarta-19-l0.uninet.net.mx (201.154.95.117)", + "ASN": "AS???", + "Loss%": 0.0, + "Snt": 10, + "Last": 6.954, + "Avg": 10.216, + "Best": 6.177, + "Wrst": 16.151, + "StDev": 3.343 + }, + { + "count": 5, + "host": "bb-la-onewilshire-29-ae32_0.uninet.net.mx (189.246.202.49)", + "ASN": "AS???", + "Loss%": 0.0, + "Snt": 10, + "Last": 52.557, + "Avg": 54.174, + "Best": 45.681, + "Wrst": 71.387, + "StDev": 8.011 + }, + { + "count": 6, + "host": "ae91.edge7.LosAngeles1.Level3.net (4.7.28.197)", + "ASN": "AS3356", + "Loss%": 0.0, + "Snt": 10, + "Last": 1079.2, + "Avg": 875.97, + "Best": 47.78, + "Wrst": 4150.5, + "StDev": 1345.7 + }, + { + "count": 7, + "host": "???", + "ASN": "AS???", + "Loss%": 100.0, + "Snt": 10, + "Last": 0.0, + "Avg": 0.0, + "Best": 0.0, + "Wrst": 0.0, + "StDev": 0.0 + }, + { + "count": 8, + "host": "ae10.cr1.lax10.us.zip.zayo.com (64.125.28.224)", + "ASN": "AS6461", + "Loss%": 70.0, + "Snt": 10, + "Last": 1186.5, + "Avg": 2189.8, + "Best": 1186.5, + "Wrst": 3202.8, + "StDev": 1008.2 + }, + { + "count": 9, + "host": "ae16.cr1.sjc1.us.zip.zayo.com (64.125.21.171)", + "ASN": "AS6461", + "Loss%": 0.0, + "Snt": 10, + "Last": 92.819, + "Avg": 414.75, + "Best": 90.799, + "Wrst": 2140.8, + "StDev": 690.96 + }, + { + "count": 10, + "host": "ae27.cs3.sjc7.us.zip.zayo.com (64.125.18.28)", + "ASN": "AS6461", + "Loss%": 90.0, + "Snt": 10, + "Last": 5234.8, + "Avg": 5234.8, + "Best": 5234.8, + "Wrst": 5234.8, + "StDev": 0.0 + }, + { + "count": 11, + "host": "???", + "ASN": "AS???", + "Loss%": 100.0, + "Snt": 10, + "Last": 0.0, + "Avg": 0.0, + "Best": 0.0, + "Wrst": 0.0, + "StDev": 0.0 + }, + { + "count": 12, + "host": "ae8.cr1.sea1.us.zip.zayo.com (64.125.28.193)", + "ASN": "AS6461", + "Loss%": 0.0, + "Snt": 10, + "Last": 93.389, + "Avg": 1238.6, + "Best": 91.537, + "Wrst": 5223.9, + "StDev": 1644.1 + }, + { + "count": 13, + "host": "ae7.ter2.sea1.us.zip.zayo.com (64.125.19.197)", + "ASN": "AS6461", + "Loss%": 0.0, + "Snt": 10, + "Last": 91.212, + "Avg": 112.17, + "Best": 90.979, + "Wrst": 178.26, + "StDev": 30.086 + }, + { + "count": 14, + "host": "208.185.33.178.IDIA-369396-ZYO.zip.zayo.com (208.185.33.178)", + "ASN": "AS6461", + "Loss%": 0.0, + "Snt": 10, + "Last": 103.95, + "Avg": 104.46, + "Best": 90.349, + "Wrst": 136.62, + "StDev": 15.726 + }, + { + "count": 15, + "host": "168.245.215.250", + "ASN": "AS55039", + "Loss%": 0.0, + "Snt": 10, + "Last": 85.672, + "Avg": 95.289, + "Best": 84.352, + "Wrst": 156.16, + "StDev": 21.621 + }, + { + "count": 16, + "host": "???", + "ASN": "AS???", + "Loss%": 100.0, + "Snt": 10, + "Last": 0.0, + "Avg": 0.0, + "Best": 0.0, + "Wrst": 0.0, + "StDev": 0.0 + } + ] + } +} diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py index a598a71..0f9388f 100644 --- a/tests/managers/network/tool_run.py +++ b/tests/managers/network/tool_run.py @@ -1,11 +1,27 @@ +import os +from datetime import datetime, timezone from uuid import uuid4 import faker - +import pytest + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.mtr import ( + get_mtr_version, + parse_raw_output, + MTRReport, + get_mtr_command, +) from generalresearch.models.network.tool_run import ( new_tool_run_from_nmap, run_dig, + MtrRun, + ToolName, + ToolClass, + Status, ) +from generalresearch.models.network.tool_utils import ToolRunCommand + fake = faker.Faker() @@ -38,6 +54,7 @@ def test_run_dig(toolrun_manager): assert reverse_dns_run == run_out + def test_run_dig_empty(toolrun_manager): reverse_dns_run = run_dig(ip=fake.ipv6()) @@ -45,4 +62,46 @@ def test_run_dig_empty(toolrun_manager): run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) - assert reverse_dns_run == run_out \ No newline at end of file + assert reverse_dns_run == run_out + + +@pytest.fixture(scope="session") +def mtr_report(request) -> MTRReport: + fp = os.path.join(request.config.rootpath, "data/mtr_fatbeam.json") + with open(fp, "r") as f: + s = f.read() + data = parse_raw_output(s) + data["port"] = 443 + data["protocol"] = IPProtocol.TCP + return MTRReport.model_validate(data) + + +def test_create_tool_run_from_mtr(toolrun_manager, mtr_report): + started_at = datetime.now(tz=timezone.utc) + tool_version = get_mtr_version() + + ip = mtr_report.destination + + finished_at = datetime.now(tz=timezone.utc) + raw_command = " ".join(get_mtr_command(ip)) + + run = MtrRun( + tool_name=ToolName.MTR, + tool_class=ToolClass.TRACEROUTE, + tool_version=tool_version, + status=Status.SUCCESS, + ip=ip, + started_at=started_at, + finished_at=finished_at, + raw_command=raw_command, + scan_group_id=uuid4().hex, + config=ToolRunCommand.from_raw_command(raw_command), + parsed=mtr_report, + source_ip="1.1.1.1" + ) + + toolrun_manager.create_mtr_run(run) + + run_out = toolrun_manager.get_mtr_run(run.id) + + assert run == run_out -- cgit v1.2.3 From b0306293ef52816998a463fbfe4c5b97d00b9b65 Mon Sep 17 00:00:00 2001 From: stuppie Date: Wed, 11 Mar 2026 18:17:09 -0600 Subject: network: completely rename and reorganize everything with consistent naming --- generalresearch/managers/network/label.py | 5 +- generalresearch/managers/network/mtr.py | 16 +- generalresearch/managers/network/nmap.py | 18 +- generalresearch/managers/network/rdns.py | 13 +- generalresearch/managers/network/tool_run.py | 44 +- generalresearch/models/network/mtr.py | 249 ----------- generalresearch/models/network/mtr/__init__.py | 0 generalresearch/models/network/mtr/command.py | 75 ++++ generalresearch/models/network/mtr/execute.py | 52 +++ generalresearch/models/network/mtr/features.py | 146 +++++++ generalresearch/models/network/mtr/parser.py | 18 + generalresearch/models/network/mtr/result.py | 167 ++++++++ generalresearch/models/network/nmap.py | 457 --------------------- generalresearch/models/network/nmap/__init__.py | 0 generalresearch/models/network/nmap/command.py | 25 ++ generalresearch/models/network/nmap/execute.py | 30 ++ generalresearch/models/network/nmap/parser.py | 412 +++++++++++++++++++ generalresearch/models/network/nmap/result.py | 431 +++++++++++++++++++ generalresearch/models/network/rdns.py | 101 ----- generalresearch/models/network/rdns/__init__.py | 0 generalresearch/models/network/rdns/command.py | 33 ++ generalresearch/models/network/rdns/execute.py | 41 ++ generalresearch/models/network/rdns/parser.py | 21 + generalresearch/models/network/rdns/result.py | 51 +++ generalresearch/models/network/tool_run.py | 103 +---- generalresearch/models/network/tool_run_command.py | 9 + generalresearch/models/network/tool_utils.py | 69 ---- generalresearch/models/network/utils.py | 5 + generalresearch/models/network/xml_parser.py | 408 ------------------ test_utils/conftest.py | 12 +- test_utils/managers/network/conftest.py | 127 ++++-- tests/managers/network/tool_run.py | 90 +--- tests/models/network/nmap.py | 32 -- tests/models/network/nmap_parser.py | 22 + tests/models/network/rdns.py | 64 ++- tests/models/network/tool_run.py | 8 - 36 files changed, 1755 insertions(+), 1599 deletions(-) delete mode 100644 generalresearch/models/network/mtr.py create mode 100644 generalresearch/models/network/mtr/__init__.py create mode 100644 generalresearch/models/network/mtr/command.py create mode 100644 generalresearch/models/network/mtr/execute.py create mode 100644 generalresearch/models/network/mtr/features.py create mode 100644 generalresearch/models/network/mtr/parser.py create mode 100644 generalresearch/models/network/mtr/result.py delete mode 100644 generalresearch/models/network/nmap.py create mode 100644 generalresearch/models/network/nmap/__init__.py create mode 100644 generalresearch/models/network/nmap/command.py create mode 100644 generalresearch/models/network/nmap/execute.py create mode 100644 generalresearch/models/network/nmap/parser.py create mode 100644 generalresearch/models/network/nmap/result.py delete mode 100644 generalresearch/models/network/rdns.py create mode 100644 generalresearch/models/network/rdns/__init__.py create mode 100644 generalresearch/models/network/rdns/command.py create mode 100644 generalresearch/models/network/rdns/execute.py create mode 100644 generalresearch/models/network/rdns/parser.py create mode 100644 generalresearch/models/network/rdns/result.py create mode 100644 generalresearch/models/network/tool_run_command.py delete mode 100644 generalresearch/models/network/tool_utils.py create mode 100644 generalresearch/models/network/utils.py delete mode 100644 generalresearch/models/network/xml_parser.py delete mode 100644 tests/models/network/nmap.py create mode 100644 tests/models/network/nmap_parser.py delete mode 100644 tests/models/network/tool_run.py (limited to 'tests/managers/network') diff --git a/generalresearch/managers/network/label.py b/generalresearch/managers/network/label.py index 0405716..65c63e5 100644 --- a/generalresearch/managers/network/label.py +++ b/generalresearch/managers/network/label.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone, timedelta from typing import Collection, Optional, List +from psycopg import sql from pydantic import TypeAdapter, IPvAnyNetwork from generalresearch.managers.base import PostgresManager @@ -14,7 +15,8 @@ from generalresearch.models.network.label import IPLabel, IPLabelKind, IPLabelSo class IPLabelManager(PostgresManager): def create(self, ip_label: IPLabel) -> IPLabel: - query = """ + query = sql.SQL( + """ INSERT INTO network_iplabel ( ip, labeled_at, created_at, label_kind, source, confidence, @@ -24,6 +26,7 @@ class IPLabelManager(PostgresManager): %(label_kind)s, %(source)s, %(confidence)s, %(provider)s, %(metadata)s ) RETURNING id;""" + ) params = ip_label.model_dump_postgres() with self.pg_config.make_connection() as conn: with conn.cursor() as c: diff --git a/generalresearch/managers/network/mtr.py b/generalresearch/managers/network/mtr.py index 496d3f0..35e4871 100644 --- a/generalresearch/managers/network/mtr.py +++ b/generalresearch/managers/network/mtr.py @@ -1,18 +1,19 @@ from typing import Optional -from psycopg import Cursor +from psycopg import Cursor, sql from generalresearch.managers.base import PostgresManager -from generalresearch.models.network.tool_run import MtrRun +from generalresearch.models.network.tool_run import MTRRun -class MtrManager(PostgresManager): +class MTRRunManager(PostgresManager): - def _create(self, run: MtrRun, c: Optional[Cursor] = None) -> None: + def _create(self, run: MTRRun, c: Optional[Cursor] = None) -> None: """ Do not use this directly. Must only be used in the context of a toolrun """ - query = """ + query = sql.SQL( + """ INSERT INTO network_mtr ( run_id, source_ip, facility_id, protocol, port, parsed @@ -22,9 +23,11 @@ class MtrManager(PostgresManager): %(protocol)s, %(port)s, %(parsed)s ); """ + ) params = run.model_dump_postgres() - query_hops = """ + query_hops = sql.SQL( + """ INSERT INTO network_mtrhop ( hop, ip, domain, asn, mtr_run_id ) VALUES ( @@ -32,6 +35,7 @@ class MtrManager(PostgresManager): %(asn)s, %(mtr_run_id)s ) """ + ) mtr_run = run.parsed params_hops = [h.model_dump_postgres(run_id=run.id) for h in mtr_run.hops] diff --git a/generalresearch/managers/network/nmap.py b/generalresearch/managers/network/nmap.py index 9cbc283..0995a32 100644 --- a/generalresearch/managers/network/nmap.py +++ b/generalresearch/managers/network/nmap.py @@ -1,19 +1,20 @@ from typing import Optional -from psycopg import Cursor +from psycopg import Cursor, sql from generalresearch.managers.base import PostgresManager -from generalresearch.models.network.tool_run import PortScanRun +from generalresearch.models.network.tool_run import NmapRun -class NmapManager(PostgresManager): +class NmapRunManager(PostgresManager): - def _create(self, run: PortScanRun, c: Optional[Cursor] = None) -> None: + def _create(self, run: NmapRun, c: Optional[Cursor] = None) -> None: """ - Insert a PortScan + PortScanPorts from a Pydantic NmapRun. + Insert a PortScan + PortScanPorts from a Pydantic NmapResult. Do not use this directly. Must only be used in the context of a toolrun """ - query = """ + query = sql.SQL( + """ INSERT INTO network_portscan ( run_id, xml_version, host_state, host_state_reason, latency_ms, distance, @@ -29,9 +30,11 @@ class NmapManager(PostgresManager): %(started_at)s, %(ip)s ); """ + ) params = run.model_dump_postgres() - query_ports = """ + query_ports = sql.SQL( + """ INSERT INTO network_portscanport ( port_scan_id, protocol, port, state, reason, reason_ttl, @@ -42,6 +45,7 @@ class NmapManager(PostgresManager): %(service_name)s ) """ + ) nmap_run = run.parsed params_ports = [p.model_dump_postgres(run_id=run.id) for p in nmap_run.ports] diff --git a/generalresearch/managers/network/rdns.py b/generalresearch/managers/network/rdns.py index 0b9b7b6..3543180 100644 --- a/generalresearch/managers/network/rdns.py +++ b/generalresearch/managers/network/rdns.py @@ -3,12 +3,12 @@ from typing import Optional from psycopg import Cursor from generalresearch.managers.base import PostgresManager -from generalresearch.models.network.tool_run import RDnsRun +from generalresearch.models.network.tool_run import RDNSRun -class RdnsManager(PostgresManager): +class RDNSRunManager(PostgresManager): - def _create(self, run: RDnsRun, c: Optional[Cursor] = None) -> None: + def _create(self, run: RDNSRun, c: Optional[Cursor] = None) -> None: """ Do not use this directly. Must only be used in the context of a toolrun """ @@ -23,4 +23,9 @@ class RdnsManager(PostgresManager): ); """ params = run.model_dump_postgres() - c.execute(query, params) \ No newline at end of file + if c: + c.execute(query, params) + else: + with self.pg_config.make_connection() as conn: + with conn.cursor() as c: + c.execute(query, params) diff --git a/generalresearch/managers/network/tool_run.py b/generalresearch/managers/network/tool_run.py index 6280221..33853a0 100644 --- a/generalresearch/managers/network/tool_run.py +++ b/generalresearch/managers/network/tool_run.py @@ -3,16 +3,12 @@ from typing import Collection from psycopg import Cursor, sql from generalresearch.managers.base import PostgresManager, Permission -from generalresearch.models.network.rdns import RDNSResult -from generalresearch.models.network.tool_run import ( - ToolRun, - PortScanRun, - RDnsRun, - MtrRun, -) -from generalresearch.managers.network.nmap import NmapManager -from generalresearch.managers.network.rdns import RdnsManager -from generalresearch.managers.network.mtr import MtrManager + +from generalresearch.managers.network.nmap import NmapRunManager +from generalresearch.managers.network.rdns import RDNSRunManager +from generalresearch.managers.network.mtr import MTRRunManager +from generalresearch.models.network.rdns.result import RDNSResult +from generalresearch.models.network.tool_run import NmapRun, RDNSRun, MTRRun from generalresearch.pg_helper import PostgresConfig @@ -23,11 +19,11 @@ class ToolRunManager(PostgresManager): permissions: Collection[Permission] = None, ): super().__init__(pg_config=pg_config, permissions=permissions) - self.nmap_manager = NmapManager(self.pg_config) - self.rdns_manager = RdnsManager(self.pg_config) - self.mtr_manager = MtrManager(self.pg_config) + self.nmap_manager = NmapRunManager(self.pg_config) + self.rdns_manager = RDNSRunManager(self.pg_config) + self.mtr_manager = MTRRunManager(self.pg_config) - def create_tool_run(self, run: PortScanRun | RDnsRun | MtrRun, c: Cursor): + def create_tool_run(self, run: NmapRun | RDNSRun | MTRRun, c: Cursor): query = sql.SQL( """ INSERT INTO network_toolrun ( @@ -50,9 +46,9 @@ class ToolRunManager(PostgresManager): run.id = run_id return None - def create_portscan_run(self, run: PortScanRun) -> PortScanRun: + def create_nmap_run(self, run: NmapRun) -> NmapRun: """ - Insert a PortScan + PortScanPorts from a Pydantic NmapRun. + Insert a PortScan + PortScanPorts from a Pydantic NmapResult. """ with self.pg_config.make_connection() as conn: with conn.cursor() as c: @@ -60,7 +56,7 @@ class ToolRunManager(PostgresManager): self.nmap_manager._create(run, c=c) return run - def get_portscan_run(self, id: int) -> PortScanRun: + def get_nmap_run(self, id: int) -> NmapRun: query = """ SELECT tr.*, np.parsed FROM network_toolrun tr @@ -69,9 +65,9 @@ class ToolRunManager(PostgresManager): """ params = {"id": id} res = self.pg_config.execute_sql_query(query, params)[0] - return PortScanRun.model_validate(res) + return NmapRun.model_validate(res) - def create_rdns_run(self, run: RDnsRun) -> RDnsRun: + def create_rdns_run(self, run: RDNSRun) -> RDNSRun: """ Insert a RDnsRun + RDNSResult """ @@ -81,7 +77,7 @@ class ToolRunManager(PostgresManager): self.rdns_manager._create(run, c=c) return run - def get_rdns_run(self, id: int) -> RDnsRun: + def get_rdns_run(self, id: int) -> RDNSRun: query = """ SELECT tr.*, hostnames FROM network_toolrun tr @@ -94,16 +90,16 @@ class ToolRunManager(PostgresManager): {"ip": res["ip"], "hostnames": res["hostnames"]} ) res["parsed"] = parsed - return RDnsRun.model_validate(res) + return RDNSRun.model_validate(res) - def create_mtr_run(self, run: MtrRun) -> MtrRun: + def create_mtr_run(self, run: MTRRun) -> MTRRun: with self.pg_config.make_connection() as conn: with conn.cursor() as c: self.create_tool_run(run, c) self.mtr_manager._create(run, c=c) return run - def get_mtr_run(self, id: int) -> MtrRun: + def get_mtr_run(self, id: int) -> MTRRun: query = """ SELECT tr.*, mtr.parsed, mtr.source_ip, mtr.facility_id FROM network_toolrun tr @@ -112,4 +108,4 @@ class ToolRunManager(PostgresManager): """ params = {"id": id} res = self.pg_config.execute_sql_query(query, params)[0] - return MtrRun.model_validate(res) + return MTRRun.model_validate(res) diff --git a/generalresearch/models/network/mtr.py b/generalresearch/models/network/mtr.py deleted file mode 100644 index 4b040de..0000000 --- a/generalresearch/models/network/mtr.py +++ /dev/null @@ -1,249 +0,0 @@ -import json -import re -import subprocess -from functools import cached_property -from ipaddress import ip_address -from typing import List, Optional, Dict - -import tldextract -from pydantic import ( - Field, - field_validator, - BaseModel, - ConfigDict, - model_validator, - computed_field, -) - -from generalresearch.models.network.definitions import IPProtocol, get_ip_kind, IPKind - - -class MTRHop(BaseModel): - model_config = ConfigDict(populate_by_name=True) - - hop: int = Field(alias="count") - host: str - asn: Optional[int] = Field(default=None, alias="ASN") - - loss_pct: float = Field(alias="Loss%") - sent: int = Field(alias="Snt") - - last_ms: float = Field(alias="Last") - avg_ms: float = Field(alias="Avg") - best_ms: float = Field(alias="Best") - worst_ms: float = Field(alias="Wrst") - stdev_ms: float = Field(alias="StDev") - - hostname: Optional[str] = Field( - default=None, examples=["fixed-187-191-8-145.totalplay.net"] - ) - ip: Optional[str] = None - - @field_validator("asn", mode="before") - @classmethod - def normalize_asn(cls, v: str): - if v is None or v == "AS???": - return None - if type(v) is int: - return v - return int(v.replace("AS", "")) - - @model_validator(mode="after") - def parse_host(self): - host = self.host.strip() - - # hostname (ip) - m = HOST_RE.match(host) - if m: - self.hostname = m.group("hostname") - self.ip = m.group("ip") - return self - - # ip only - try: - ip_address(host) - self.ip = host - self.hostname = None - return self - except ValueError: - pass - - # hostname only - self.hostname = host - self.ip = None - return self - - @cached_property - def ip_kind(self) -> Optional[IPKind]: - return get_ip_kind(self.ip) - - @cached_property - def icmp_rate_limited(self): - if self.avg_ms == 0: - return False - return self.stdev_ms > self.avg_ms or self.worst_ms > self.best_ms * 10 - - @computed_field(examples=["totalplay.net"]) - @cached_property - def domain(self) -> Optional[str]: - if self.hostname: - return tldextract.extract(self.hostname).top_domain_under_public_suffix - - def model_dump_postgres(self, run_id: int): - # Writes for the network_mtrhop table - d = {"mtr_run_id": run_id} - data = self.model_dump( - mode="json", - include={ - "hop", - "ip", - "domain", - "asn", - }, - ) - d.update(data) - return d - - -class MTRReport(BaseModel): - model_config = ConfigDict(populate_by_name=True) - - source: str = Field(description="Hostname of the system running mtr.", alias="src") - destination: str = Field( - description="Destination hostname or IP being traced.", alias="dst" - ) - tos: int = Field(description="IP Type-of-Service (TOS) value used for probes.") - tests: int = Field(description="Number of probes sent per hop.") - psize: int = Field(description="Probe packet size in bytes.") - bitpattern: str = Field(description="Payload byte pattern used in probes (hex).") - - # Protocol used for the traceroute - protocol: IPProtocol = Field() - # The target port number for TCP/SCTP/UDP traces - port: Optional[int] = Field() - - hops: List[MTRHop] = Field() - - def model_dump_postgres(self): - # Writes for the network_mtr table - d = self.model_dump( - mode="json", - include={"port"}, - ) - d["protocol"] = self.protocol.to_number() - d["parsed"] = self.model_dump_json(indent=0) - return d - - def print_report(self) -> None: - print( - f"MTR Report → {self.destination} {self.protocol.name} {self.port or ''}\n" - ) - host_max_len = max(len(h.host) for h in self.hops) - - header = ( - f"{'Hop':>3} " - f"{'Host':<{host_max_len}} " - f"{'Kind':<10} " - f"{'ASN':<8} " - f"{'Loss%':>6} {'Sent':>5} " - f"{'Last':>7} {'Avg':>7} {'Best':>7} {'Worst':>7} {'StDev':>7}" - ) - print(header) - print("-" * len(header)) - - for hop in self.hops: - print( - f"{hop.hop:>3} " - f"{hop.host:<{host_max_len}} " - f"{hop.ip_kind or '???':<10} " - f"{hop.asn or '???':<8} " - f"{hop.loss_pct:6.1f} " - f"{hop.sent:5d} " - f"{hop.last_ms:7.1f} " - f"{hop.avg_ms:7.1f} " - f"{hop.best_ms:7.1f} " - f"{hop.worst_ms:7.1f} " - f"{hop.stdev_ms:7.1f}" - ) - - -HOST_RE = re.compile(r"^(?P.+?) \((?P[^)]+)\)$") - -SUPPORTED_PROTOCOLS = { - IPProtocol.TCP, - IPProtocol.UDP, - IPProtocol.SCTP, - IPProtocol.ICMP, -} -PROTOCOLS_W_PORT = {IPProtocol.TCP, IPProtocol.UDP, IPProtocol.SCTP} - - -def get_mtr_command( - ip: str, - protocol: Optional[IPProtocol] = None, - port: Optional[int] = None, - report_cycles: int = 10, -) -> List[str]: - # https://manpages.ubuntu.com/manpages/focal/man8/mtr.8.html - # e.g. "mtr -r -c 2 -b -z -j -T -P 443 74.139.70.149" - args = ["mtr", "--report", "--show-ips", "--aslookup", "--json"] - if report_cycles is not None: - args.extend(["-c", str(int(report_cycles))]) - if port is not None: - if protocol is None: - protocol = IPProtocol.TCP - assert protocol in PROTOCOLS_W_PORT, "port only allowed for TCP/SCTP/UDP traces" - args.extend(["--port", str(int(port))]) - if protocol: - assert protocol in SUPPORTED_PROTOCOLS, f"unsupported protocol: {protocol}" - # default is ICMP (no args) - arg_map = { - IPProtocol.TCP: "--tcp", - IPProtocol.UDP: "--udp", - IPProtocol.SCTP: "--sctp", - } - if protocol in arg_map: - args.append(arg_map[protocol]) - args.append(ip) - return args - - -def get_mtr_version() -> str: - proc = subprocess.run( - ["mtr", "-v"], - capture_output=True, - text=True, - check=False, - ) - # e.g. mtr 0.95 - ver_str = proc.stdout.strip() - return ver_str.split(" ", 1)[1] - - -def run_mtr( - ip: str, - protocol: Optional[IPProtocol] = None, - port: Optional[int] = None, - report_cycles: int = 10, -) -> MTRReport: - args = get_mtr_command( - ip=ip, protocol=protocol, port=port, report_cycles=report_cycles - ) - proc = subprocess.run( - args, - capture_output=True, - text=True, - check=False, - ) - raw = proc.stdout.strip() - data = parse_raw_output(raw) - data["port"] = port - data["protocol"] = protocol - return MTRReport.model_validate(data) - - -def parse_raw_output(raw: str) -> Dict: - data = json.loads(raw)["report"] - data.update(data.pop("mtr")) - data["hops"] = data.pop("hubs") - return data diff --git a/generalresearch/models/network/mtr/__init__.py b/generalresearch/models/network/mtr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/generalresearch/models/network/mtr/command.py b/generalresearch/models/network/mtr/command.py new file mode 100644 index 0000000..e3ab903 --- /dev/null +++ b/generalresearch/models/network/mtr/command.py @@ -0,0 +1,75 @@ +import subprocess +from typing import List, Optional + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.mtr.parser import parse_mtr_output +from generalresearch.models.network.mtr.result import MTRResult + +SUPPORTED_PROTOCOLS = { + IPProtocol.TCP, + IPProtocol.UDP, + IPProtocol.SCTP, + IPProtocol.ICMP, +} +PROTOCOLS_W_PORT = {IPProtocol.TCP, IPProtocol.UDP, IPProtocol.SCTP} + + +def build_mtr_command( + ip: str, + protocol: Optional[IPProtocol] = None, + port: Optional[int] = None, + report_cycles: int = 10, +) -> str: + # https://manpages.ubuntu.com/manpages/focal/man8/mtr.8.html + # e.g. "mtr -r -c 2 -b -z -j -T -P 443 74.139.70.149" + args = ["mtr", "--report", "--show-ips", "--aslookup", "--json"] + if report_cycles is not None: + args.extend(["-c", str(int(report_cycles))]) + if port is not None: + if protocol is None: + protocol = IPProtocol.TCP + assert protocol in PROTOCOLS_W_PORT, "port only allowed for TCP/SCTP/UDP traces" + args.extend(["--port", str(int(port))]) + if protocol: + assert protocol in SUPPORTED_PROTOCOLS, f"unsupported protocol: {protocol}" + # default is ICMP (no args) + arg_map = { + IPProtocol.TCP: "--tcp", + IPProtocol.UDP: "--udp", + IPProtocol.SCTP: "--sctp", + } + if protocol in arg_map: + args.append(arg_map[protocol]) + args.append(ip) + return " ".join(args) + + +def get_mtr_version() -> str: + proc = subprocess.run( + ["mtr", "-v"], + capture_output=True, + text=True, + check=False, + ) + # e.g. mtr 0.95 + ver_str = proc.stdout.strip() + return ver_str.split(" ", 1)[1] + + +def run_mtr( + ip: str, + protocol: Optional[IPProtocol] = None, + port: Optional[int] = None, + report_cycles: int = 10, +) -> MTRResult: + args = build_mtr_command( + ip=ip, protocol=protocol, port=port, report_cycles=report_cycles + ) + proc = subprocess.run( + args.split(" "), + capture_output=True, + text=True, + check=False, + ) + raw = proc.stdout.strip() + return parse_mtr_output(raw, protocol=protocol, port=port) diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py new file mode 100644 index 0000000..bd556bc --- /dev/null +++ b/generalresearch/models/network/mtr/execute.py @@ -0,0 +1,52 @@ +from datetime import datetime, timezone +from typing import Optional +from uuid import uuid4 + +from generalresearch.models.custom_types import UUIDStr +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.mtr.command import ( + run_mtr, + get_mtr_version, + build_mtr_command, +) +from generalresearch.models.network.tool_run import MTRRun, ToolName, ToolClass, Status +from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.utils import get_source_ip + + +def execute_mtr( + ip: str, + scan_group_id: Optional[UUIDStr] = None, + protocol: Optional[IPProtocol] = None, + port: Optional[int] = None, + report_cycles: int = 10, +) -> MTRRun: + started_at = datetime.now(tz=timezone.utc) + tool_version = get_mtr_version() + result = run_mtr(ip, protocol=protocol, port=port, report_cycles=report_cycles) + finished_at = datetime.now(tz=timezone.utc) + raw_command = build_mtr_command(ip) + config = ToolRunCommand( + command="mtr", + options={ + "protocol": protocol, + "port": port, + "report_cycles": report_cycles, + }, + ) + + return MTRRun( + tool_name=ToolName.MTR, + tool_class=ToolClass.TRACEROUTE, + tool_version=tool_version, + status=Status.SUCCESS, + ip=ip, + started_at=started_at, + finished_at=finished_at, + raw_command=raw_command, + scan_group_id=scan_group_id or uuid4().hex, + config=config, + parsed=result, + source_ip=get_source_ip(), + facility_id=1, + ) diff --git a/generalresearch/models/network/mtr/features.py b/generalresearch/models/network/mtr/features.py new file mode 100644 index 0000000..e7f2ff1 --- /dev/null +++ b/generalresearch/models/network/mtr/features.py @@ -0,0 +1,146 @@ +from typing import List, Dict + +from pydantic import BaseModel, ConfigDict, Field + +from generalresearch.models.network.definitions import IPKind +from generalresearch.models.network.mtr import MTRHop + + +class MTRFeatures(BaseModel): + model_config = ConfigDict() + + hop_count: int = Field() + + public_hop_count: int + private_hop_count: int + + unique_asn_count: int + asn_transition_count: int + + missing_hop_count: int + missing_hop_ratio: float + + # typical for mobile (vs residential) + private_hops_after_public: int + + has_cgnat: bool + + +def trim_local_hops(hops: List[MTRHop]) -> List[MTRHop]: + start = 0 + for i, h in enumerate(hops): + if h.ip_kind == IPKind.PUBLIC: + start = i + break + return hops[start:] + + +def extract_mtr_features(hops: List[MTRHop]) -> Dict[str, float | int | bool | None]: + features: Dict[str, float | int | bool | None] = {} + + if not hops: + return {} + + hops = trim_local_hops(hops) + + features["hop_count"] = len(hops) + + private_hops = 0 + public_hops = 0 + for h in hops: + if not h.ip: + continue + if h.ip_kind == IPKind.PUBLIC: + public_hops += 1 + else: + private_hops += 1 + features["private_hop_count"] = private_hops + features["public_hop_count"] = public_hops + + # ----------------------- + # ASN structure + # ----------------------- + + asns = [h.asn for h in hops if h.asn] + + features["unique_asn_count"] = len(set(asns)) + + asn_changes = 0 + for a, b in zip(asns, asns[1:]): + if a != b: + asn_changes += 1 + + features["asn_transition_count"] = asn_changes + + # ----------------------- + # Missing hops + # ----------------------- + + missing_hops = sum(1 for h in hops if h.ip is None) + + features["missing_hop_count"] = missing_hops + features["missing_hop_ratio"] = missing_hops / len(hops) + + # ----------------------- + # Packet loss + # ----------------------- + + lossy_hops = sum(1 for h in hops if h.loss_pct > 0) + + features["lossy_hop_count"] = lossy_hops + features["max_loss_pct"] = max(h.loss_pct for h in hops) + + # ----------------------- + # Latency stats + # ----------------------- + + avg_rtts = [h.avg_ms for h in hops if h.avg_ms > 0] + + if avg_rtts: + features["destination_rtt"] = avg_rtts[-1] + features["mean_rtt"] = sum(avg_rtts) / len(avg_rtts) + features["max_rtt"] = max(avg_rtts) + else: + features["destination_rtt"] = None + features["mean_rtt"] = None + features["max_rtt"] = None + + # ----------------------- + # RTT jumps + # ----------------------- + + rtt_jumps = [] + + for a, b in zip(hops, hops[1:]): + if a.avg_ms > 0 and b.avg_ms > 0: + rtt_jumps.append(b.avg_ms - a.avg_ms) + + if rtt_jumps: + features["max_rtt_jump"] = max(rtt_jumps) + features["mean_rtt_jump"] = sum(rtt_jumps) / len(rtt_jumps) + else: + features["max_rtt_jump"] = None + features["mean_rtt_jump"] = None + + # ----------------------- + # Jitter + # ----------------------- + + stdevs = [h.stdev_ms for h in hops if h.stdev_ms > 0] + + if stdevs: + features["max_jitter"] = max(stdevs) + features["mean_jitter"] = sum(stdevs) / len(stdevs) + else: + features["max_jitter"] = None + features["mean_jitter"] = None + + # ----------------------- + # Route completion + # ----------------------- + + last = hops[-1] + + features["destination_reached"] = last.ip is not None and last.loss_pct < 100 + + return features diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py new file mode 100644 index 0000000..dc108d9 --- /dev/null +++ b/generalresearch/models/network/mtr/parser.py @@ -0,0 +1,18 @@ +import json +from typing import Dict + +from generalresearch.models.network.mtr.result import MTRResult + + +def parse_mtr_output(raw: str, port, protocol) -> MTRResult: + data = parse_mtr_raw_output(raw) + data["port"] = port + data["protocol"] = protocol + return MTRResult.model_validate(data) + + +def parse_mtr_raw_output(raw: str) -> Dict: + data = json.loads(raw)["report"] + data.update(data.pop("mtr")) + data["hops"] = data.pop("hubs") + return data diff --git a/generalresearch/models/network/mtr/result.py b/generalresearch/models/network/mtr/result.py new file mode 100644 index 0000000..62f92ab --- /dev/null +++ b/generalresearch/models/network/mtr/result.py @@ -0,0 +1,167 @@ +import re +from functools import cached_property +from ipaddress import ip_address +from typing import List, Optional + +import tldextract +from pydantic import ( + Field, + field_validator, + BaseModel, + ConfigDict, + model_validator, + computed_field, +) + +from generalresearch.models.network.definitions import IPProtocol, get_ip_kind, IPKind + +HOST_RE = re.compile(r"^(?P.+?) \((?P[^)]+)\)$") + + +class MTRHop(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + hop: int = Field(alias="count") + host: str + asn: Optional[int] = Field(default=None, alias="ASN") + + loss_pct: float = Field(alias="Loss%") + sent: int = Field(alias="Snt") + + last_ms: float = Field(alias="Last") + avg_ms: float = Field(alias="Avg") + best_ms: float = Field(alias="Best") + worst_ms: float = Field(alias="Wrst") + stdev_ms: float = Field(alias="StDev") + + hostname: Optional[str] = Field( + default=None, examples=["fixed-187-191-8-145.totalplay.net"] + ) + ip: Optional[str] = None + + @field_validator("asn", mode="before") + @classmethod + def normalize_asn(cls, v: str): + if v is None or v == "AS???": + return None + if type(v) is int: + return v + return int(v.replace("AS", "")) + + @model_validator(mode="after") + def parse_host(self): + host = self.host.strip() + + # hostname (ip) + m = HOST_RE.match(host) + if m: + self.hostname = m.group("hostname") + self.ip = m.group("ip") + return self + + # ip only + try: + ip_address(host) + self.ip = host + self.hostname = None + return self + except ValueError: + pass + + # hostname only + self.hostname = host + self.ip = None + return self + + @cached_property + def ip_kind(self) -> Optional[IPKind]: + return get_ip_kind(self.ip) + + @cached_property + def icmp_rate_limited(self): + if self.avg_ms == 0: + return False + return self.stdev_ms > self.avg_ms or self.worst_ms > self.best_ms * 10 + + @computed_field(examples=["totalplay.net"]) + @cached_property + def domain(self) -> Optional[str]: + if self.hostname: + return tldextract.extract(self.hostname).top_domain_under_public_suffix + + def model_dump_postgres(self, run_id: int): + # Writes for the network_mtrhop table + d = {"mtr_run_id": run_id} + data = self.model_dump( + mode="json", + include={ + "hop", + "ip", + "domain", + "asn", + }, + ) + d.update(data) + return d + + +class MTRResult(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + source: str = Field(description="Hostname of the system running mtr.", alias="src") + destination: str = Field( + description="Destination hostname or IP being traced.", alias="dst" + ) + tos: int = Field(description="IP Type-of-Service (TOS) value used for probes.") + tests: int = Field(description="Number of probes sent per hop.") + psize: int = Field(description="Probe packet size in bytes.") + bitpattern: str = Field(description="Payload byte pattern used in probes (hex).") + + # Protocol used for the traceroute + protocol: IPProtocol = Field() + # The target port number for TCP/SCTP/UDP traces + port: Optional[int] = Field() + + hops: List[MTRHop] = Field() + + def model_dump_postgres(self): + # Writes for the network_mtr table + d = self.model_dump( + mode="json", + include={"port"}, + ) + d["protocol"] = self.protocol.to_number() + d["parsed"] = self.model_dump_json(indent=0) + return d + + def print_report(self) -> None: + print( + f"MTR Report → {self.destination} {self.protocol.name} {self.port or ''}\n" + ) + host_max_len = max(len(h.host) for h in self.hops) + + header = ( + f"{'Hop':>3} " + f"{'Host':<{host_max_len}} " + f"{'Kind':<10} " + f"{'ASN':<8} " + f"{'Loss%':>6} {'Sent':>5} " + f"{'Last':>7} {'Avg':>7} {'Best':>7} {'Worst':>7} {'StDev':>7}" + ) + print(header) + print("-" * len(header)) + + for hop in self.hops: + print( + f"{hop.hop:>3} " + f"{hop.host:<{host_max_len}} " + f"{hop.ip_kind or '???':<10} " + f"{hop.asn or '???':<8} " + f"{hop.loss_pct:6.1f} " + f"{hop.sent:5d} " + f"{hop.last_ms:7.1f} " + f"{hop.avg_ms:7.1f} " + f"{hop.best_ms:7.1f} " + f"{hop.worst_ms:7.1f} " + f"{hop.stdev_ms:7.1f}" + ) diff --git a/generalresearch/models/network/nmap.py b/generalresearch/models/network/nmap.py deleted file mode 100644 index 4656a93..0000000 --- a/generalresearch/models/network/nmap.py +++ /dev/null @@ -1,457 +0,0 @@ -import json -import subprocess -from datetime import timedelta -from enum import StrEnum -from functools import cached_property -from typing import Dict, Any, Literal, List, Optional, Tuple, Set - -from pydantic import computed_field, BaseModel, Field - -from generalresearch.models.custom_types import AwareDatetimeISO, IPvAnyAddressStr -from generalresearch.models.network.definitions import IPProtocol - - -class PortState(StrEnum): - OPEN = "open" - CLOSED = "closed" - FILTERED = "filtered" - UNFILTERED = "unfiltered" - OPEN_FILTERED = "open|filtered" - CLOSED_FILTERED = "closed|filtered" - # Added by me, does not get returned. Used for book-keeping - NOT_SCANNED = "not_scanned" - - -class PortStateReason(StrEnum): - SYN_ACK = "syn-ack" - RESET = "reset" - CONN_REFUSED = "conn-refused" - NO_RESPONSE = "no-response" - SYN = "syn" - FIN = "fin" - - ICMP_NET_UNREACH = "net-unreach" - ICMP_HOST_UNREACH = "host-unreach" - ICMP_PROTO_UNREACH = "proto-unreach" - ICMP_PORT_UNREACH = "port-unreach" - - ADMIN_PROHIBITED = "admin-prohibited" - HOST_PROHIBITED = "host-prohibited" - NET_PROHIBITED = "net-prohibited" - - ECHO_REPLY = "echo-reply" - TIME_EXCEEDED = "time-exceeded" - - -class NmapScanType(StrEnum): - SYN = "syn" - CONNECT = "connect" - ACK = "ack" - WINDOW = "window" - MAIMON = "maimon" - FIN = "fin" - NULL = "null" - XMAS = "xmas" - UDP = "udp" - SCTP_INIT = "sctpinit" - SCTP_COOKIE_ECHO = "sctpcookieecho" - - -class NmapHostState(StrEnum): - UP = "up" - DOWN = "down" - UNKNOWN = "unknown" - - -class NmapHostStatusReason(StrEnum): - USER_SET = "user-set" - SYN_ACK = "syn-ack" - RESET = "reset" - ECHO_REPLY = "echo-reply" - ARP_RESPONSE = "arp-response" - NO_RESPONSE = "no-response" - NET_UNREACH = "net-unreach" - HOST_UNREACH = "host-unreach" - PROTO_UNREACH = "proto-unreach" - PORT_UNREACH = "port-unreach" - ADMIN_PROHIBITED = "admin-prohibited" - LOCALHOST_RESPONSE = "localhost-response" - - -class NmapOSClass(BaseModel): - vendor: str = None - osfamily: str = None - osgen: Optional[str] = None - accuracy: int = None - cpe: Optional[List[str]] = None - - -class NmapOSMatch(BaseModel): - name: str - accuracy: int - classes: List[NmapOSClass] = Field(default_factory=list) - - @property - def best_class(self) -> Optional[NmapOSClass]: - if not self.classes: - return None - return max(self.classes, key=lambda m: m.accuracy) - - -class NmapScript(BaseModel): - """ - - """ - - id: str - output: Optional[str] = None - elements: Dict[str, Any] = Field(default_factory=dict) - - -class NmapService(BaseModel): - # - name: Optional[str] = None - product: Optional[str] = None - version: Optional[str] = None - extrainfo: Optional[str] = None - method: Optional[str] = None - conf: Optional[int] = None - cpe: List[str] = Field(default_factory=list) - - def model_dump_postgres(self): - d = self.model_dump(mode="json") - d["service_name"] = self.name - return d - - -class NmapPort(BaseModel): - port: int = Field() - protocol: IPProtocol = Field() - # Closed ports will not have a NmapPort record - state: PortState = Field() - reason: Optional[PortStateReason] = Field(default=None) - reason_ttl: Optional[int] = Field(default=None) - - service: Optional[NmapService] = None - scripts: List[NmapScript] = Field(default_factory=list) - - def model_dump_postgres(self, run_id: int): - # Writes for the network_portscanport table - d = {"port_scan_id": run_id} - data = self.model_dump( - mode="json", - include={ - "port", - "state", - "reason", - "reason_ttl", - }, - ) - d.update(data) - d["protocol"] = self.protocol.to_number() - if self.service: - d.update(self.service.model_dump_postgres()) - return d - - -class NmapHostScript(BaseModel): - id: str = Field() - output: Optional[str] = Field(default=None) - - -class NmapTraceHop(BaseModel): - """ - One hop observed during Nmap's traceroute. - - Example XML: - - """ - - ttl: int = Field() - - ipaddr: Optional[str] = Field( - default=None, - description="IP address of the responding router or host", - ) - - rtt_ms: Optional[float] = Field( - default=None, - description="Round-trip time in milliseconds for the probe reaching this hop.", - ) - - host: Optional[str] = Field( - default=None, - description="Reverse DNS hostname for the hop if Nmap resolved one.", - ) - - -class NmapTrace(BaseModel): - """ - Traceroute information collected by Nmap. - - Nmap performs a single traceroute per host using probes matching the scan - type (typically TCP) directed at a chosen destination port. - - Example XML: - - - ... - - """ - - port: Optional[int] = Field( - default=None, - description="Destination port used for traceroute probes (may be absent depending on scan type).", - ) - protocol: Optional[IPProtocol] = Field( - default=None, - description="Transport protocol used for the traceroute probes (tcp, udp, etc.).", - ) - - hops: List[NmapTraceHop] = Field( - default_factory=list, - description="Ordered list of hops observed during the traceroute.", - ) - - @property - def destination(self) -> Optional[NmapTraceHop]: - return self.hops[-1] if self.hops else None - - -class NmapHostname(BaseModel): - # - name: str - type: Optional[Literal["PTR", "user"]] = None - - -class NmapPortStats(BaseModel): - """ - This is counts across all protocols scanned (tcp/udp) - """ - - open: int = 0 - closed: int = 0 - filtered: int = 0 - unfiltered: int = 0 - open_filtered: int = 0 - closed_filtered: int = 0 - - -class NmapScanInfo(BaseModel): - """ - We could have multiple protocols in one run. - - - """ - - type: NmapScanType = Field() - protocol: IPProtocol = Field() - num_services: int = Field() - services: str = Field() - - @cached_property - def port_set(self) -> Set[int]: - """ - Expand the Nmap services string into a set of port numbers. - Example: - "22-25,80,443" -> {22,23,24,25,80,443} - """ - ports: Set[int] = set() - for part in self.services.split(","): - if "-" in part: - start, end = part.split("-", 1) - ports.update(range(int(start), int(end) + 1)) - else: - ports.add(int(part)) - return ports - - -class NmapRun(BaseModel): - """ - A Nmap Run. Expects that we've only scanned ONE host. - """ - - command_line: str = Field() - started_at: AwareDatetimeISO = Field() - version: str = Field() - xmloutputversion: Literal["1.04"] = Field() - - scan_infos: List[NmapScanInfo] = Field(min_length=1) - - # comes from - finished_at: Optional[AwareDatetimeISO] = Field(default=None) - exit_status: Optional[Literal["success", "error"]] = Field(default=None) - - ##### - # Everything below here is from within the *single* host we've scanned - ##### - - # - host_state: NmapHostState = Field() - host_state_reason: NmapHostStatusReason = Field() - host_state_reason_ttl: Optional[int] = None - - #
- target_ip: IPvAnyAddressStr = Field() - - hostnames: List[NmapHostname] = Field() - - ports: List[NmapPort] = [] - port_stats: NmapPortStats = Field() - - # - uptime_seconds: Optional[int] = Field(default=None) - # - distance: Optional[int] = Field(description="approx number of hops", default=None) - - # - tcp_sequence_index: Optional[int] = None - tcp_sequence_difficulty: Optional[str] = None - - # - ipid_sequence_class: Optional[str] = None - - # - tcp_timestamp_class: Optional[str] = None - - # - srtt_us: Optional[int] = Field( - default=None, description="smoothed RTT estimate (microseconds µs)" - ) - rttvar_us: Optional[int] = Field( - default=None, description="RTT variance (microseconds µs)" - ) - timeout_us: Optional[int] = Field( - default=None, description="probe timeout (microseconds µs)" - ) - - os_matches: Optional[List[NmapOSMatch]] = Field(default=None) - - host_scripts: List[NmapHostScript] = Field(default_factory=list) - - trace: Optional[NmapTrace] = Field(default=None) - - raw_xml: Optional[str] = None - - @computed_field - @property - def last_boot(self) -> Optional[AwareDatetimeISO]: - if self.uptime_seconds: - return self.started_at - timedelta(seconds=self.uptime_seconds) - - @property - def scan_info_tcp(self): - return next( - filter(lambda x: x.protocol == IPProtocol.TCP, self.scan_infos), None - ) - - @property - def scan_info_udp(self): - return next( - filter(lambda x: x.protocol == IPProtocol.UDP, self.scan_infos), None - ) - - @property - def latency_ms(self) -> Optional[float]: - return self.srtt_us / 1000 if self.srtt_us is not None else None - - @property - def best_os_match(self) -> Optional[NmapOSMatch]: - if not self.os_matches: - return None - return max(self.os_matches, key=lambda m: m.accuracy) - - def filter_ports(self, protocol: IPProtocol, state: PortState) -> List[NmapPort]: - return [p for p in self.ports if p.protocol == protocol and p.state == state] - - @property - def tcp_open_ports(self) -> List[int]: - """ - Returns a list of open TCP port numbers. - """ - return [ - p.port - for p in self.filter_ports(protocol=IPProtocol.TCP, state=PortState.OPEN) - ] - - @property - def udp_open_ports(self) -> List[int]: - """ - Returns a list of open UDP port numbers. - """ - return [ - p.port - for p in self.filter_ports(protocol=IPProtocol.UDP, state=PortState.OPEN) - ] - - @cached_property - def _port_index(self) -> Dict[Tuple[IPProtocol, int], NmapPort]: - return {(p.protocol, p.port): p for p in self.ports} - - def get_port_state( - self, port: int, protocol: IPProtocol = IPProtocol.TCP - ) -> PortState: - # Explicit (only if scanned and not closed) - if (protocol, port) in self._port_index: - return self._port_index[(protocol, port)].state - - # Check if we even scanned it - scaninfo = next((s for s in self.scan_infos if s.protocol == protocol), None) - if scaninfo and port in scaninfo.port_set: - return PortState.CLOSED - - # We didn't scan it - return PortState.NOT_SCANNED - - def model_dump_postgres(self): - # Writes for the network_portscan table - d = dict() - data = self.model_dump( - mode="json", - include={ - "started_at", - "host_state", - "host_state_reason", - "distance", - "uptime_seconds", - "raw_xml", - }, - ) - d.update(data) - d["ip"] = self.target_ip - d["xml_version"] = self.xmloutputversion - d["latency_ms"] = self.latency_ms - d["last_boot"] = self.last_boot - d["parsed"] = self.model_dump_json(indent=0) - d["open_tcp_ports"] = json.dumps(self.tcp_open_ports) - return d - - -def get_nmap_command(ip: str, top_ports: Optional[int] = 1000) -> List[str]: - # e.g. "nmap -Pn -T4 -A --top-ports 1000 -oX - scanme.nmap.org" - # https://linux.die.net/man/1/nmap - args = ["nmap", "-Pn", "-T4", "-A", "--top-ports", str(int(top_ports)), "-oX", "-"] - args.append(ip) - return args - - -def run_nmap(ip: str, top_ports: Optional[int] = 1000) -> NmapRun: - from generalresearch.models.network.xml_parser import NmapXmlParser - - p = NmapXmlParser() - - args = get_nmap_command(ip=ip, top_ports=top_ports) - proc = subprocess.run( - args, - capture_output=True, - text=True, - check=False, - ) - raw = proc.stdout.strip() - n = p.parse_xml(raw) - return n diff --git a/generalresearch/models/network/nmap/__init__.py b/generalresearch/models/network/nmap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/generalresearch/models/network/nmap/command.py b/generalresearch/models/network/nmap/command.py new file mode 100644 index 0000000..dfa55de --- /dev/null +++ b/generalresearch/models/network/nmap/command.py @@ -0,0 +1,25 @@ +import subprocess +from typing import Optional, List + +from generalresearch.models.network.nmap.parser import parse_nmap_xml +from generalresearch.models.network.nmap.result import NmapResult + + +def build_nmap_command(ip: str, top_ports: Optional[int] = 1000) -> List[str]: + # e.g. "nmap -Pn -T4 -A --top-ports 1000 -oX - scanme.nmap.org" + # https://linux.die.net/man/1/nmap + args = ["nmap", "-Pn", "-T4", "-A", "--top-ports", str(int(top_ports)), "-oX", "-"] + args.append(ip) + return args + + +def run_nmap(ip: str, top_ports: Optional[int] = 1000) -> NmapResult: + args = build_nmap_command(ip=ip, top_ports=top_ports) + proc = subprocess.run( + args, + capture_output=True, + text=True, + check=False, + ) + raw = proc.stdout.strip() + return parse_nmap_xml(raw) diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py new file mode 100644 index 0000000..fc1e2fa --- /dev/null +++ b/generalresearch/models/network/nmap/execute.py @@ -0,0 +1,30 @@ +from typing import Optional +from uuid import uuid4 + +from generalresearch.models.custom_types import UUIDStr +from generalresearch.models.network.nmap.command import run_nmap +from generalresearch.models.network.tool_run import NmapRun, ToolName, ToolClass, Status +from generalresearch.models.network.tool_run_command import ToolRunCommand + + +def execute_nmap( + ip: str, top_ports: Optional[int] = 1000, scan_group_id: Optional[UUIDStr] = None +): + result = run_nmap(ip=ip, top_ports=top_ports) + assert result.exit_status == "success" + assert result.target_ip == ip + + run = NmapRun( + tool_name=ToolName.NMAP, + tool_class=ToolClass.PORT_SCAN, + tool_version=result.version, + status=Status.SUCCESS, + ip=ip, + started_at=result.started_at, + finished_at=result.finished_at, + raw_command=result.command_line, + scan_group_id=scan_group_id or uuid4().hex, + config=ToolRunCommand(command="nmap", options={'top_ports': top_ports}), + parsed=result, + ) + return run \ No newline at end of file diff --git a/generalresearch/models/network/nmap/parser.py b/generalresearch/models/network/nmap/parser.py new file mode 100644 index 0000000..5a441bb --- /dev/null +++ b/generalresearch/models/network/nmap/parser.py @@ -0,0 +1,412 @@ +import xml.etree.cElementTree as ET +from datetime import datetime, timezone +from typing import List, Dict, Any, Tuple, Optional + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.nmap.result import ( + NmapHostname, + NmapResult, + NmapPort, + PortState, + PortStateReason, + NmapService, + NmapScript, + NmapPortStats, + NmapScanType, + NmapHostState, + NmapHostStatusReason, + NmapHostScript, + NmapOSMatch, + NmapOSClass, + NmapTrace, + NmapTraceHop, + NmapScanInfo, +) + + +class NmapParserException(Exception): + def __init__(self, msg): + self.msg = msg + + def __str__(self): + return self.msg + + +class NmapXmlParser: + """ + Example: https://nmap.org/book/output-formats-xml-output.html + Full DTD: https://nmap.org/book/nmap-dtd.html + """ + + @classmethod + def parse_xml(cls, nmap_data: str) -> NmapResult: + """ + Expects a full nmap scan report. + """ + + try: + root = ET.fromstring(nmap_data) + except Exception as e: + emsg = "Wrong XML structure: cannot parse data: {0}".format(e) + raise NmapParserException(emsg) + + if root.tag != "nmaprun": + raise NmapParserException("Unpexpected data structure for XML " "root node") + return cls._parse_xml_nmaprun(root) + + @classmethod + def _parse_xml_nmaprun(cls, root: ET.Element) -> NmapResult: + """ + This method parses out a full nmap scan report from its XML root + node: . We expect there is only 1 host in this report! + + :param root: Element from xml.ElementTree (top of XML the document) + """ + cls._validate_nmap_root(root) + host_count = len(root.findall(".//host")) + assert host_count == 1, f"Expected 1 host, got {host_count}" + + xml_str = ET.tostring(root, encoding="unicode").replace("\n", "") + nmap_data = {"raw_xml": xml_str} + nmap_data.update(cls._parse_nmaprun(root)) + + nmap_data["scan_infos"] = [ + cls._parse_scaninfo(scaninfo_el) + for scaninfo_el in root.findall(".//scaninfo") + ] + + nmap_data.update(cls._parse_runstats(root)) + + nmap_data.update(cls._parse_xml_host(root.find(".//host"))) + + return NmapResult.model_validate(nmap_data) + + @classmethod + def _validate_nmap_root(cls, root: ET.Element) -> None: + allowed = { + "scaninfo", + "host", + "runstats", + "verbose", + "debugging", + "taskprogress", + } + + found = {child.tag for child in root} + unexpected = found - allowed + if unexpected: + raise ValueError( + f"Unexpected top-level tags in nmap XML: {sorted(unexpected)}" + ) + + @classmethod + def _parse_scaninfo(cls, scaninfo_el: ET.Element) -> NmapScanInfo: + data = dict() + data["type"] = NmapScanType(scaninfo_el.attrib["type"]) + data["protocol"] = IPProtocol(scaninfo_el.attrib["protocol"]) + data["num_services"] = scaninfo_el.attrib["numservices"] + data["services"] = scaninfo_el.attrib["services"] + return NmapScanInfo.model_validate(data) + + @classmethod + def _parse_runstats(cls, root: ET.Element) -> Dict: + runstats = root.find("runstats") + if runstats is None: + return {} + + finished = runstats.find("finished") + if finished is None: + return {} + + finished_at = None + ts = finished.attrib.get("time") + if ts: + finished_at = datetime.fromtimestamp(int(ts), tz=timezone.utc) + + return { + "finished_at": finished_at, + "exit_status": finished.attrib.get("exit"), + } + + @classmethod + def _parse_nmaprun(cls, nmaprun_el: ET.Element) -> Dict: + nmap_data = dict() + nmaprun = dict(nmaprun_el.attrib) + nmap_data["command_line"] = nmaprun["args"] + nmap_data["started_at"] = datetime.fromtimestamp( + float(nmaprun["start"]), tz=timezone.utc + ) + nmap_data["version"] = nmaprun["version"] + nmap_data["xmloutputversion"] = nmaprun["xmloutputversion"] + return nmap_data + + @classmethod + def _parse_xml_host(cls, host_el: ET.Element) -> Dict: + """ + Receives a XML tag representing a scanned host with + its services. + """ + data = dict() + + # + status_el = host_el.find("status") + data["host_state"] = NmapHostState(status_el.attrib["state"]) + data["host_state_reason"] = NmapHostStatusReason(status_el.attrib["reason"]) + host_state_reason_ttl = status_el.attrib.get("reason_ttl") + if host_state_reason_ttl: + data["host_state_reason_ttl"] = int(host_state_reason_ttl) + + #
+ address_el = host_el.find("address") + data["target_ip"] = address_el.attrib["addr"] + + data["hostnames"] = cls._parse_hostnames(host_el.find("hostnames")) + + data["ports"], data["port_stats"] = cls._parse_xml_ports(host_el.find("ports")) + + uptime = host_el.find("uptime") + if uptime is not None: + data["uptime_seconds"] = int(uptime.attrib["seconds"]) + + distance = host_el.find("distance") + if distance is not None: + data["distance"] = int(distance.attrib["value"]) + + tcpsequence = host_el.find("tcpsequence") + if tcpsequence is not None: + data["tcp_sequence_index"] = int(tcpsequence.attrib["index"]) + data["tcp_sequence_difficulty"] = tcpsequence.attrib["difficulty"] + ipidsequence = host_el.find("ipidsequence") + if ipidsequence is not None: + data["ipid_sequence_class"] = ipidsequence.attrib["class"] + tcptssequence = host_el.find("tcptssequence") + if tcptssequence is not None: + data["tcp_timestamp_class"] = tcptssequence.attrib["class"] + + times_elem = host_el.find("times") + if times_elem is not None: + data.update( + { + "srtt_us": int(times_elem.attrib.get("srtt", 0)) or None, + "rttvar_us": int(times_elem.attrib.get("rttvar", 0)) or None, + "timeout_us": int(times_elem.attrib.get("to", 0)) or None, + } + ) + + hostscripts_el = host_el.find("hostscript") + if hostscripts_el is not None: + data["host_scripts"] = [ + NmapHostScript(id=el.attrib["id"], output=el.attrib.get("output")) + for el in hostscripts_el.findall("script") + ] + + data["os_matches"] = cls._parse_os_matches(host_el) + + data["trace"] = cls._parse_trace(host_el) + + return data + + @classmethod + def _parse_os_matches(cls, host_el: ET.Element) -> List[NmapOSMatch] | None: + os_elem = host_el.find("os") + if os_elem is None: + return None + + matches: List[NmapOSMatch] = [] + + for m in os_elem.findall("osmatch"): + classes: List[NmapOSClass] = [] + + for c in m.findall("osclass"): + cpes = [e.text.strip() for e in c.findall("cpe") if e.text] + + classes.append( + NmapOSClass( + vendor=c.attrib.get("vendor"), + osfamily=c.attrib.get("osfamily"), + osgen=c.attrib.get("osgen"), + accuracy=( + int(c.attrib["accuracy"]) + if "accuracy" in c.attrib + else None + ), + cpe=cpes or None, + ) + ) + + matches.append( + NmapOSMatch( + name=m.attrib["name"], + accuracy=int(m.attrib["accuracy"]), + classes=classes, + ) + ) + + return matches or None + + @classmethod + def _parse_hostnames(cls, hostnames_el: ET.Element) -> List[NmapHostname]: + """ + Parses the hostnames element. + e.g. + + + """ + return [ + cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname") + ] + + @classmethod + def _parse_hostname(cls, hostname_el: ET.Element) -> NmapHostname: + """ + Parses the hostname element. + e.g. + + :param hostname_el: XML tag from a nmap scan + """ + return NmapHostname.model_validate(dict(hostname_el.attrib)) + + @classmethod + def _parse_xml_ports( + cls, ports_elem: ET.Element + ) -> Tuple[List[NmapPort], NmapPortStats]: + """ + Parses the list of scanned services from a targeted host. + """ + ports: List[NmapPort] = [] + stats = NmapPortStats() + + # handle extraports first + for e in ports_elem.findall("extraports"): + state = PortState(e.attrib["state"]) + count = int(e.attrib["count"]) + + key = state.value.replace("|", "_") + setattr(stats, key, getattr(stats, key) + count) + + for port_elem in ports_elem.findall("port"): + port = cls._parse_xml_port(port_elem) + ports.append(port) + key = port.state.value.replace("|", "_") + setattr(stats, key, getattr(stats, key) + 1) + return ports, stats + + @classmethod + def _parse_xml_service(cls, service_elem: ET.Element) -> NmapService: + svc = { + "name": service_elem.attrib.get("name"), + "product": service_elem.attrib.get("product"), + "version": service_elem.attrib.get("version"), + "extrainfo": service_elem.attrib.get("extrainfo"), + "method": service_elem.attrib.get("method"), + "conf": ( + int(service_elem.attrib["conf"]) + if "conf" in service_elem.attrib + else None + ), + "cpe": [e.text.strip() for e in service_elem.findall("cpe")], + } + + return NmapService.model_validate(svc) + + @classmethod + def _parse_xml_script(cls, script_elem: ET.Element) -> NmapScript: + output = script_elem.attrib.get("output") + if output: + output = output.strip() + script = { + "id": script_elem.attrib["id"], + "output": output, + } + + elements: Dict[str, Any] = {} + + # handle value + for elem in script_elem.findall(".//elem"): + key = elem.attrib.get("key") + if key: + elements[key.strip()] = elem.text.strip() + + script["elements"] = elements + return NmapScript.model_validate(script) + + @classmethod + def _parse_xml_port(cls, port_elem: ET.Element) -> NmapPort: + """ + + + + + + """ + state_elem = port_elem.find("state") + + port = { + "port": int(port_elem.attrib["portid"]), + "protocol": port_elem.attrib["protocol"], + "state": PortState(state_elem.attrib["state"]), + "reason": ( + PortStateReason(state_elem.attrib["reason"]) + if "reason" in state_elem.attrib + else None + ), + "reason_ttl": ( + int(state_elem.attrib["reason_ttl"]) + if "reason_ttl" in state_elem.attrib + else None + ), + } + + service_elem = port_elem.find("service") + if service_elem is not None: + port["service"] = cls._parse_xml_service(service_elem) + + port["scripts"] = [] + for script_elem in port_elem.findall("script"): + port["scripts"].append(cls._parse_xml_script(script_elem)) + + return NmapPort.model_validate(port) + + @classmethod + def _parse_trace(cls, host_elem: ET.Element) -> Optional[NmapTrace]: + trace_elem = host_elem.find("trace") + if trace_elem is None: + return None + + port_attr = trace_elem.attrib.get("port") + proto_attr = trace_elem.attrib.get("proto") + + hops: List[NmapTraceHop] = [] + + for hop_elem in trace_elem.findall("hop"): + ttl = hop_elem.attrib.get("ttl") + if ttl is None: + continue # ttl is required by the DTD but guard anyway + + rtt = hop_elem.attrib.get("rtt") + ipaddr = hop_elem.attrib.get("ipaddr") + host = hop_elem.attrib.get("host") + + hops.append( + NmapTraceHop( + ttl=int(ttl), + ipaddr=ipaddr, + rtt_ms=float(rtt) if rtt is not None else None, + host=host, + ) + ) + + return NmapTrace( + port=int(port_attr) if port_attr is not None else None, + protocol=IPProtocol(proto_attr) if proto_attr is not None else None, + hops=hops, + ) + + +def parse_nmap_xml(raw): + return NmapXmlParser.parse_xml(raw) diff --git a/generalresearch/models/network/nmap/result.py b/generalresearch/models/network/nmap/result.py new file mode 100644 index 0000000..635db06 --- /dev/null +++ b/generalresearch/models/network/nmap/result.py @@ -0,0 +1,431 @@ +import json +from datetime import timedelta +from enum import StrEnum +from functools import cached_property +from typing import Dict, Any, Literal, List, Optional, Tuple, Set + +from pydantic import computed_field, BaseModel, Field + +from generalresearch.models.custom_types import AwareDatetimeISO, IPvAnyAddressStr +from generalresearch.models.network.definitions import IPProtocol + + +class PortState(StrEnum): + OPEN = "open" + CLOSED = "closed" + FILTERED = "filtered" + UNFILTERED = "unfiltered" + OPEN_FILTERED = "open|filtered" + CLOSED_FILTERED = "closed|filtered" + # Added by me, does not get returned. Used for book-keeping + NOT_SCANNED = "not_scanned" + + +class PortStateReason(StrEnum): + SYN_ACK = "syn-ack" + RESET = "reset" + CONN_REFUSED = "conn-refused" + NO_RESPONSE = "no-response" + SYN = "syn" + FIN = "fin" + + ICMP_NET_UNREACH = "net-unreach" + ICMP_HOST_UNREACH = "host-unreach" + ICMP_PROTO_UNREACH = "proto-unreach" + ICMP_PORT_UNREACH = "port-unreach" + + ADMIN_PROHIBITED = "admin-prohibited" + HOST_PROHIBITED = "host-prohibited" + NET_PROHIBITED = "net-prohibited" + + ECHO_REPLY = "echo-reply" + TIME_EXCEEDED = "time-exceeded" + + +class NmapScanType(StrEnum): + SYN = "syn" + CONNECT = "connect" + ACK = "ack" + WINDOW = "window" + MAIMON = "maimon" + FIN = "fin" + NULL = "null" + XMAS = "xmas" + UDP = "udp" + SCTP_INIT = "sctpinit" + SCTP_COOKIE_ECHO = "sctpcookieecho" + + +class NmapHostState(StrEnum): + UP = "up" + DOWN = "down" + UNKNOWN = "unknown" + + +class NmapHostStatusReason(StrEnum): + USER_SET = "user-set" + SYN_ACK = "syn-ack" + RESET = "reset" + ECHO_REPLY = "echo-reply" + ARP_RESPONSE = "arp-response" + NO_RESPONSE = "no-response" + NET_UNREACH = "net-unreach" + HOST_UNREACH = "host-unreach" + PROTO_UNREACH = "proto-unreach" + PORT_UNREACH = "port-unreach" + ADMIN_PROHIBITED = "admin-prohibited" + LOCALHOST_RESPONSE = "localhost-response" + + +class NmapOSClass(BaseModel): + vendor: str = None + osfamily: str = None + osgen: Optional[str] = None + accuracy: int = None + cpe: Optional[List[str]] = None + + +class NmapOSMatch(BaseModel): + name: str + accuracy: int + classes: List[NmapOSClass] = Field(default_factory=list) + + @property + def best_class(self) -> Optional[NmapOSClass]: + if not self.classes: + return None + return max(self.classes, key=lambda m: m.accuracy) + + +class NmapScript(BaseModel): + """ + + """ + + id: str + output: Optional[str] = None + elements: Dict[str, Any] = Field(default_factory=dict) + + +class NmapService(BaseModel): + # + name: Optional[str] = None + product: Optional[str] = None + version: Optional[str] = None + extrainfo: Optional[str] = None + method: Optional[str] = None + conf: Optional[int] = None + cpe: List[str] = Field(default_factory=list) + + def model_dump_postgres(self): + d = self.model_dump(mode="json") + d["service_name"] = self.name + return d + + +class NmapPort(BaseModel): + port: int = Field() + protocol: IPProtocol = Field() + # Closed ports will not have a NmapPort record + state: PortState = Field() + reason: Optional[PortStateReason] = Field(default=None) + reason_ttl: Optional[int] = Field(default=None) + + service: Optional[NmapService] = None + scripts: List[NmapScript] = Field(default_factory=list) + + def model_dump_postgres(self, run_id: int): + # Writes for the network_portscanport table + d = {"port_scan_id": run_id} + data = self.model_dump( + mode="json", + include={ + "port", + "state", + "reason", + "reason_ttl", + }, + ) + d.update(data) + d["protocol"] = self.protocol.to_number() + if self.service: + d.update(self.service.model_dump_postgres()) + return d + + +class NmapHostScript(BaseModel): + id: str = Field() + output: Optional[str] = Field(default=None) + + +class NmapTraceHop(BaseModel): + """ + One hop observed during Nmap's traceroute. + + Example XML: + + """ + + ttl: int = Field() + + ipaddr: Optional[str] = Field( + default=None, + description="IP address of the responding router or host", + ) + + rtt_ms: Optional[float] = Field( + default=None, + description="Round-trip time in milliseconds for the probe reaching this hop.", + ) + + host: Optional[str] = Field( + default=None, + description="Reverse DNS hostname for the hop if Nmap resolved one.", + ) + + +class NmapTrace(BaseModel): + """ + Traceroute information collected by Nmap. + + Nmap performs a single traceroute per host using probes matching the scan + type (typically TCP) directed at a chosen destination port. + + Example XML: + + + ... + + """ + + port: Optional[int] = Field( + default=None, + description="Destination port used for traceroute probes (may be absent depending on scan type).", + ) + protocol: Optional[IPProtocol] = Field( + default=None, + description="Transport protocol used for the traceroute probes (tcp, udp, etc.).", + ) + + hops: List[NmapTraceHop] = Field( + default_factory=list, + description="Ordered list of hops observed during the traceroute.", + ) + + @property + def destination(self) -> Optional[NmapTraceHop]: + return self.hops[-1] if self.hops else None + + +class NmapHostname(BaseModel): + # + name: str + type: Optional[Literal["PTR", "user"]] = None + + +class NmapPortStats(BaseModel): + """ + This is counts across all protocols scanned (tcp/udp) + """ + + open: int = 0 + closed: int = 0 + filtered: int = 0 + unfiltered: int = 0 + open_filtered: int = 0 + closed_filtered: int = 0 + + +class NmapScanInfo(BaseModel): + """ + We could have multiple protocols in one run. + + + """ + + type: NmapScanType = Field() + protocol: IPProtocol = Field() + num_services: int = Field() + services: str = Field() + + @cached_property + def port_set(self) -> Set[int]: + """ + Expand the Nmap services string into a set of port numbers. + Example: + "22-25,80,443" -> {22,23,24,25,80,443} + """ + ports: Set[int] = set() + for part in self.services.split(","): + if "-" in part: + start, end = part.split("-", 1) + ports.update(range(int(start), int(end) + 1)) + else: + ports.add(int(part)) + return ports + + +class NmapResult(BaseModel): + """ + A Nmap Run. Expects that we've only scanned ONE host. + """ + + command_line: str = Field() + started_at: AwareDatetimeISO = Field() + version: str = Field() + xmloutputversion: Literal["1.04"] = Field() + + scan_infos: List[NmapScanInfo] = Field(min_length=1) + + # comes from + finished_at: Optional[AwareDatetimeISO] = Field(default=None) + exit_status: Optional[Literal["success", "error"]] = Field(default=None) + + ##### + # Everything below here is from within the *single* host we've scanned + ##### + + # + host_state: NmapHostState = Field() + host_state_reason: NmapHostStatusReason = Field() + host_state_reason_ttl: Optional[int] = None + + #
+ target_ip: IPvAnyAddressStr = Field() + + hostnames: List[NmapHostname] = Field() + + ports: List[NmapPort] = [] + port_stats: NmapPortStats = Field() + + # + uptime_seconds: Optional[int] = Field(default=None) + # + distance: Optional[int] = Field(description="approx number of hops", default=None) + + # + tcp_sequence_index: Optional[int] = None + tcp_sequence_difficulty: Optional[str] = None + + # + ipid_sequence_class: Optional[str] = None + + # + tcp_timestamp_class: Optional[str] = None + + # + srtt_us: Optional[int] = Field( + default=None, description="smoothed RTT estimate (microseconds µs)" + ) + rttvar_us: Optional[int] = Field( + default=None, description="RTT variance (microseconds µs)" + ) + timeout_us: Optional[int] = Field( + default=None, description="probe timeout (microseconds µs)" + ) + + os_matches: Optional[List[NmapOSMatch]] = Field(default=None) + + host_scripts: List[NmapHostScript] = Field(default_factory=list) + + trace: Optional[NmapTrace] = Field(default=None) + + raw_xml: Optional[str] = None + + @computed_field + @property + def last_boot(self) -> Optional[AwareDatetimeISO]: + if self.uptime_seconds: + return self.started_at - timedelta(seconds=self.uptime_seconds) + + @property + def scan_info_tcp(self): + return next( + filter(lambda x: x.protocol == IPProtocol.TCP, self.scan_infos), None + ) + + @property + def scan_info_udp(self): + return next( + filter(lambda x: x.protocol == IPProtocol.UDP, self.scan_infos), None + ) + + @property + def latency_ms(self) -> Optional[float]: + return self.srtt_us / 1000 if self.srtt_us is not None else None + + @property + def best_os_match(self) -> Optional[NmapOSMatch]: + if not self.os_matches: + return None + return max(self.os_matches, key=lambda m: m.accuracy) + + def filter_ports(self, protocol: IPProtocol, state: PortState) -> List[NmapPort]: + return [p for p in self.ports if p.protocol == protocol and p.state == state] + + @property + def tcp_open_ports(self) -> List[int]: + """ + Returns a list of open TCP port numbers. + """ + return [ + p.port + for p in self.filter_ports(protocol=IPProtocol.TCP, state=PortState.OPEN) + ] + + @property + def udp_open_ports(self) -> List[int]: + """ + Returns a list of open UDP port numbers. + """ + return [ + p.port + for p in self.filter_ports(protocol=IPProtocol.UDP, state=PortState.OPEN) + ] + + @cached_property + def _port_index(self) -> Dict[Tuple[IPProtocol, int], NmapPort]: + return {(p.protocol, p.port): p for p in self.ports} + + def get_port_state( + self, port: int, protocol: IPProtocol = IPProtocol.TCP + ) -> PortState: + # Explicit (only if scanned and not closed) + if (protocol, port) in self._port_index: + return self._port_index[(protocol, port)].state + + # Check if we even scanned it + scaninfo = next((s for s in self.scan_infos if s.protocol == protocol), None) + if scaninfo and port in scaninfo.port_set: + return PortState.CLOSED + + # We didn't scan it + return PortState.NOT_SCANNED + + def model_dump_postgres(self): + # Writes for the network_portscan table + d = dict() + data = self.model_dump( + mode="json", + include={ + "started_at", + "host_state", + "host_state_reason", + "distance", + "uptime_seconds", + "raw_xml", + }, + ) + d.update(data) + d["ip"] = self.target_ip + d["xml_version"] = self.xmloutputversion + d["latency_ms"] = self.latency_ms + d["last_boot"] = self.last_boot + d["parsed"] = self.model_dump_json(indent=0) + d["open_tcp_ports"] = json.dumps(self.tcp_open_ports) + return d diff --git a/generalresearch/models/network/rdns.py b/generalresearch/models/network/rdns.py deleted file mode 100644 index e00a32d..0000000 --- a/generalresearch/models/network/rdns.py +++ /dev/null @@ -1,101 +0,0 @@ -import ipaddress -import json -from functools import cached_property - -from pydantic import BaseModel, Field, model_validator, computed_field -from typing import Optional, List - -from typing_extensions import Self - -from generalresearch.models.custom_types import IPvAnyAddressStr -import subprocess -import re -from typing import List -import ipaddress -import tldextract - - -class RDNSResult(BaseModel): - - ip: IPvAnyAddressStr = Field() - - hostnames: List[str] = Field(default_factory=list) - - @model_validator(mode="after") - def validate_hostname_prop(self): - assert len(self.hostnames) == self.hostname_count - if self.hostnames: - assert self.hostnames[0] == self.primary_hostname - assert self.primary_domain in self.primary_hostname - return self - - @computed_field(examples=["fixed-187-191-8-145.totalplay.net"]) - @cached_property - def primary_hostname(self) -> Optional[str]: - if self.hostnames: - return self.hostnames[0] - - @computed_field(examples=[1]) - @cached_property - def hostname_count(self) -> int: - return len(self.hostnames) - - @computed_field(examples=["totalplay.net"]) - @cached_property - def primary_domain(self) -> Optional[str]: - if self.primary_hostname: - return tldextract.extract(self.primary_hostname).top_domain_under_public_suffix - - def model_dump_postgres(self): - # Writes for the network_rdnsresult table - d = self.model_dump( - mode="json", - include={"primary_hostname", "primary_domain", "hostname_count"}, - ) - d["hostnames"] = json.dumps(self.hostnames) - return d - - @classmethod - def from_dig(cls, ip: str, raw_output: str) -> Self: - hostnames: List[str] = [] - - for line in raw_output.splitlines(): - m = PTR_RE.search(line) - if m: - hostnames.append(m.group(1)) - - return cls( - ip=ipaddress.ip_address(ip), - hostnames=hostnames, - ) - - -PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.") - - -def dig_rdns(ip: str) -> RDNSResult: - args = get_dig_rdns_command(ip).split(" ") - proc = subprocess.run( - args, - capture_output=True, - text=True, - check=False, - ) - raw = proc.stdout.strip() - return RDNSResult.from_dig(ip=ip, raw_output=raw) - - -def get_dig_rdns_command(ip: str): - return " ".join(["dig", "+noall", "+answer", "-x", ip]) - - -def get_dig_version() -> str: - proc = subprocess.run( - ["dig", "-v"], - capture_output=True, - text=True, - check=False, - ) - # e.g. DiG 9.18.39-0ubuntu0.22.04.2-Ubuntu - ver_str = proc.stderr.strip() - return ver_str.split("-", 1)[0].split(" ", 1)[1] diff --git a/generalresearch/models/network/rdns/__init__.py b/generalresearch/models/network/rdns/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/generalresearch/models/network/rdns/command.py b/generalresearch/models/network/rdns/command.py new file mode 100644 index 0000000..e9d5bfd --- /dev/null +++ b/generalresearch/models/network/rdns/command.py @@ -0,0 +1,33 @@ +import subprocess + +from generalresearch.models.network.rdns.parser import parse_rdns_output +from generalresearch.models.network.rdns.result import RDNSResult + + +def run_rdns(ip: str) -> RDNSResult: + args = build_rdns_command(ip).split(" ") + proc = subprocess.run( + args, + capture_output=True, + text=True, + check=False, + ) + raw = proc.stdout.strip() + return parse_rdns_output(ip, raw) + + +def build_rdns_command(ip: str): + # e.g. dig +noall +answer -x 1.2.3.4 + return " ".join(["dig", "+noall", "+answer", "-x", ip]) + + +def get_dig_version() -> str: + proc = subprocess.run( + ["dig", "-v"], + capture_output=True, + text=True, + check=False, + ) + # e.g. DiG 9.18.39-0ubuntu0.22.04.2-Ubuntu + ver_str = proc.stderr.strip() + return ver_str.split("-", 1)[0].split(" ", 1)[1] diff --git a/generalresearch/models/network/rdns/execute.py b/generalresearch/models/network/rdns/execute.py new file mode 100644 index 0000000..97b8bf8 --- /dev/null +++ b/generalresearch/models/network/rdns/execute.py @@ -0,0 +1,41 @@ +from datetime import datetime, timezone +from typing import Optional +from uuid import uuid4 + +from generalresearch.models.custom_types import UUIDStr +from generalresearch.models.network.rdns.command import ( + run_rdns, + get_dig_version, + build_rdns_command, +) +from generalresearch.models.network.tool_run import ( + ToolName, + ToolClass, + Status, + RDNSRun, +) +from generalresearch.models.network.tool_run_command import ToolRunCommand + + +def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): + started_at = datetime.now(tz=timezone.utc) + tool_version = get_dig_version() + result = run_rdns(ip=ip) + finished_at = datetime.now(tz=timezone.utc) + raw_command = build_rdns_command(ip) + + run = RDNSRun( + tool_name=ToolName.DIG, + tool_class=ToolClass.RDNS, + tool_version=tool_version, + status=Status.SUCCESS, + ip=ip, + started_at=started_at, + finished_at=finished_at, + raw_command=raw_command, + scan_group_id=scan_group_id or uuid4().hex, + config=ToolRunCommand(command="dig", options={}), + parsed=result, + ) + + return run diff --git a/generalresearch/models/network/rdns/parser.py b/generalresearch/models/network/rdns/parser.py new file mode 100644 index 0000000..f12a6f4 --- /dev/null +++ b/generalresearch/models/network/rdns/parser.py @@ -0,0 +1,21 @@ +import ipaddress +import re +from typing import List + +from generalresearch.models.network.rdns.result import RDNSResult + +PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.") + + +def parse_rdns_output(ip, raw): + hostnames: List[str] = [] + + for line in raw.splitlines(): + m = PTR_RE.search(line) + if m: + hostnames.append(m.group(1)) + + return RDNSResult( + ip=ipaddress.ip_address(ip), + hostnames=hostnames, + ) diff --git a/generalresearch/models/network/rdns/result.py b/generalresearch/models/network/rdns/result.py new file mode 100644 index 0000000..81b4085 --- /dev/null +++ b/generalresearch/models/network/rdns/result.py @@ -0,0 +1,51 @@ +import json +from functools import cached_property +from typing import Optional, List + +import tldextract +from pydantic import BaseModel, Field, model_validator, computed_field + +from generalresearch.models.custom_types import IPvAnyAddressStr + + +class RDNSResult(BaseModel): + + ip: IPvAnyAddressStr = Field() + + hostnames: List[str] = Field(default_factory=list) + + @model_validator(mode="after") + def validate_hostname_prop(self): + assert len(self.hostnames) == self.hostname_count + if self.hostnames: + assert self.hostnames[0] == self.primary_hostname + assert self.primary_domain in self.primary_hostname + return self + + @computed_field(examples=["fixed-187-191-8-145.totalplay.net"]) + @cached_property + def primary_hostname(self) -> Optional[str]: + if self.hostnames: + return self.hostnames[0] + + @computed_field(examples=[1]) + @cached_property + def hostname_count(self) -> int: + return len(self.hostnames) + + @computed_field(examples=["totalplay.net"]) + @cached_property + def primary_domain(self) -> Optional[str]: + if self.primary_hostname: + return tldextract.extract( + self.primary_hostname + ).top_domain_under_public_suffix + + def model_dump_postgres(self): + # Writes for the network_rdnsresult table + d = self.model_dump( + mode="json", + include={"primary_hostname", "primary_domain", "hostname_count"}, + ) + d["hostnames"] = json.dumps(self.hostnames) + return d diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py index 2588890..36e6950 100644 --- a/generalresearch/models/network/tool_run.py +++ b/generalresearch/models/network/tool_run.py @@ -1,6 +1,5 @@ -from datetime import datetime, timezone from enum import StrEnum -from typing import Optional, Tuple +from typing import Optional, Literal from uuid import uuid4 from pydantic import BaseModel, Field, PositiveInt @@ -10,20 +9,10 @@ from generalresearch.models.custom_types import ( IPvAnyAddressStr, UUIDStr, ) -from generalresearch.models.network.nmap import NmapRun -from generalresearch.models.network.rdns import ( - RDNSResult, - get_dig_version, - dig_rdns, - get_dig_rdns_command, -) -from generalresearch.models.network.mtr import ( - MTRReport, - get_mtr_version, - run_mtr, - get_mtr_command, -) -from generalresearch.models.network.tool_utils import ToolRunCommand +from generalresearch.models.network.mtr.result import MTRResult +from generalresearch.models.network.nmap.result import NmapResult +from generalresearch.models.network.rdns.result import RDNSResult +from generalresearch.models.network.tool_run_command import ToolRunCommand class ToolClass(StrEnum): @@ -76,8 +65,11 @@ class ToolRun(BaseModel): return d -class PortScanRun(ToolRun): - parsed: NmapRun = Field() +class NmapRun(ToolRun): + tool_class: Literal[ToolClass.PORT_SCAN] = Field(default=ToolClass.PORT_SCAN) + tool_name: Literal[ToolName.NMAP] = Field(default=ToolName.NMAP) + + parsed: NmapResult = Field() def model_dump_postgres(self): d = super().model_dump_postgres() @@ -86,7 +78,10 @@ class PortScanRun(ToolRun): return d -class RDnsRun(ToolRun): +class RDNSRun(ToolRun): + tool_class: Literal[ToolClass.RDNS] = Field(default=ToolClass.RDNS) + tool_name: Literal[ToolName.DIG] = Field(default=ToolName.DIG) + parsed: RDNSResult = Field() def model_dump_postgres(self): @@ -96,10 +91,13 @@ class RDnsRun(ToolRun): return d -class MtrRun(ToolRun): +class MTRRun(ToolRun): + tool_class: Literal[ToolClass.TRACEROUTE] = Field(default=ToolClass.TRACEROUTE) + tool_name: Literal[ToolName.MTR] = Field(default=ToolName.MTR) + facility_id: int = Field(default=1) source_ip: IPvAnyAddressStr = Field() - parsed: MTRReport = Field() + parsed: MTRResult = Field() def model_dump_postgres(self): d = super().model_dump_postgres() @@ -108,66 +106,3 @@ class MtrRun(ToolRun): d["facility_id"] = self.facility_id d.update(self.parsed.model_dump_postgres()) return d - - -def new_tool_run_from_nmap( - nmap_run: NmapRun, scan_group_id: Optional[UUIDStr] = None -) -> PortScanRun: - assert nmap_run.exit_status == "success" - return PortScanRun( - tool_name=ToolName.NMAP, - tool_class=ToolClass.PORT_SCAN, - tool_version=nmap_run.version, - status=Status.SUCCESS, - ip=nmap_run.target_ip, - started_at=nmap_run.started_at, - finished_at=nmap_run.finished_at, - raw_command=nmap_run.command_line, - scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand.from_raw_command(nmap_run.command_line), - parsed=nmap_run, - ) - - -def run_dig(ip: str, scan_group_id: Optional[UUIDStr] = None) -> RDnsRun: - started_at = datetime.now(tz=timezone.utc) - tool_version = get_dig_version() - rdns_result = dig_rdns(ip) - finished_at = datetime.now(tz=timezone.utc) - raw_command = get_dig_rdns_command(ip) - - return RDnsRun( - tool_name=ToolName.DIG, - tool_class=ToolClass.RDNS, - tool_version=tool_version, - status=Status.SUCCESS, - ip=ip, - started_at=started_at, - finished_at=finished_at, - raw_command=raw_command, - scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand.from_raw_command(raw_command), - parsed=rdns_result, - ) - - -def mtr_tool_run(ip: str, scan_group_id: Optional[UUIDStr] = None) -> MtrRun: - started_at = datetime.now(tz=timezone.utc) - tool_version = get_mtr_version() - result = run_mtr(ip) - finished_at = datetime.now(tz=timezone.utc) - raw_command = " ".join(get_mtr_command(ip)) - - return MtrRun( - tool_name=ToolName.MTR, - tool_class=ToolClass.TRACEROUTE, - tool_version=tool_version, - status=Status.SUCCESS, - ip=ip, - started_at=started_at, - finished_at=finished_at, - raw_command=raw_command, - scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand.from_raw_command(raw_command), - parsed=result, - ) diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py new file mode 100644 index 0000000..e3d94df --- /dev/null +++ b/generalresearch/models/network/tool_run_command.py @@ -0,0 +1,9 @@ +from typing import Dict + +from pydantic import BaseModel, Field + + +class ToolRunCommand(BaseModel): + # todo: expand with arguments specific for each tool + command: str = Field() + options: Dict[str, str | int] = Field(default_factory=dict) diff --git a/generalresearch/models/network/tool_utils.py b/generalresearch/models/network/tool_utils.py deleted file mode 100644 index 83d988d..0000000 --- a/generalresearch/models/network/tool_utils.py +++ /dev/null @@ -1,69 +0,0 @@ -import shlex -from typing import Dict, List - -from pydantic import BaseModel -from typing_extensions import Self - -""" -e.g.: "nmap -Pn -sV -p 80,443 --reason --max-retries=3 1.2.3.4" -{'command': 'nmap', - 'options': {'p': '80,443', 'max-retries': '3'}, - 'flags': ['Pn', 'sV', 'reason'], - 'positionals': ['1.2.3.4']} -""" - - -class ToolRunCommand(BaseModel): - command: str - options: Dict[str, str] - flags: List[str] - positionals: List[str] - - @classmethod - def from_raw_command(cls, s: str) -> Self: - return cls.model_validate(parse_command(s)) - - -def parse_command(cmd: str): - tokens = shlex.split(cmd) - - result = { - "command": tokens[0], - "options": {}, - "flags": [], - "positionals": [], - } - - i = 1 - while i < len(tokens): - tok = tokens[i] - - # --key=value - if tok.startswith("--") and "=" in tok: - k, v = tok[2:].split("=", 1) - result["options"][k] = v - - # --key value - elif tok.startswith("--"): - key = tok[2:] - if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"): - result["options"][key] = tokens[i + 1] - i += 1 - else: - result["flags"].append(key) - - # short flag or short flag with arg - elif tok.startswith("-"): - if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"): - result["options"][tok[1:]] = tokens[i + 1] - i += 1 - else: - result["flags"].append(tok[1:]) - - else: - result["positionals"].append(tok) - - i += 1 - - result["flags"] = sorted(result["flags"]) - return result diff --git a/generalresearch/models/network/utils.py b/generalresearch/models/network/utils.py new file mode 100644 index 0000000..fee9b80 --- /dev/null +++ b/generalresearch/models/network/utils.py @@ -0,0 +1,5 @@ +import requests + + +def get_source_ip(): + return requests.get("https://icanhazip.com?").text.strip() diff --git a/generalresearch/models/network/xml_parser.py b/generalresearch/models/network/xml_parser.py deleted file mode 100644 index 349bc94..0000000 --- a/generalresearch/models/network/xml_parser.py +++ /dev/null @@ -1,408 +0,0 @@ -import xml.etree.cElementTree as ET -from datetime import datetime, timezone -from typing import List, Dict, Any, Tuple, Optional - -from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.nmap import ( - NmapHostname, - NmapRun, - NmapPort, - PortState, - PortStateReason, - NmapService, - NmapScript, - NmapPortStats, - NmapScanType, - NmapHostState, - NmapHostStatusReason, - NmapHostScript, - NmapOSMatch, - NmapOSClass, - NmapTrace, - NmapTraceHop, - NmapScanInfo, -) - - -class NmapParserException(Exception): - def __init__(self, msg): - self.msg = msg - - def __str__(self): - return self.msg - - -class NmapXmlParser: - """ - Example: https://nmap.org/book/output-formats-xml-output.html - Full DTD: https://nmap.org/book/nmap-dtd.html - """ - - @classmethod - def parse_xml(cls, nmap_data: str) -> NmapRun: - """ - Expects a full nmap scan report. - """ - - try: - root = ET.fromstring(nmap_data) - except Exception as e: - emsg = "Wrong XML structure: cannot parse data: {0}".format(e) - raise NmapParserException(emsg) - - if root.tag != "nmaprun": - raise NmapParserException("Unpexpected data structure for XML " "root node") - return cls._parse_xml_nmaprun(root) - - @classmethod - def _parse_xml_nmaprun(cls, root: ET.Element) -> NmapRun: - """ - This method parses out a full nmap scan report from its XML root - node: . We expect there is only 1 host in this report! - - :param root: Element from xml.ElementTree (top of XML the document) - """ - cls._validate_nmap_root(root) - host_count = len(root.findall(".//host")) - assert host_count == 1, f"Expected 1 host, got {host_count}" - - xml_str = ET.tostring(root, encoding="unicode").replace("\n", "") - nmap_data = {"raw_xml": xml_str} - nmap_data.update(cls._parse_nmaprun(root)) - - nmap_data["scan_infos"] = [ - cls._parse_scaninfo(scaninfo_el) - for scaninfo_el in root.findall(".//scaninfo") - ] - - nmap_data.update(cls._parse_runstats(root)) - - nmap_data.update(cls._parse_xml_host(root.find(".//host"))) - - return NmapRun.model_validate(nmap_data) - - @classmethod - def _validate_nmap_root(cls, root: ET.Element) -> None: - allowed = { - "scaninfo", - "host", - "runstats", - "verbose", - "debugging", - "taskprogress", - } - - found = {child.tag for child in root} - unexpected = found - allowed - if unexpected: - raise ValueError( - f"Unexpected top-level tags in nmap XML: {sorted(unexpected)}" - ) - - @classmethod - def _parse_scaninfo(cls, scaninfo_el: ET.Element) -> NmapScanInfo: - data = dict() - data["type"] = NmapScanType(scaninfo_el.attrib["type"]) - data["protocol"] = IPProtocol(scaninfo_el.attrib["protocol"]) - data["num_services"] = scaninfo_el.attrib["numservices"] - data["services"] = scaninfo_el.attrib["services"] - return NmapScanInfo.model_validate(data) - - @classmethod - def _parse_runstats(cls, root: ET.Element) -> Dict: - runstats = root.find("runstats") - if runstats is None: - return {} - - finished = runstats.find("finished") - if finished is None: - return {} - - finished_at = None - ts = finished.attrib.get("time") - if ts: - finished_at = datetime.fromtimestamp(int(ts), tz=timezone.utc) - - return { - "finished_at": finished_at, - "exit_status": finished.attrib.get("exit"), - } - - @classmethod - def _parse_nmaprun(cls, nmaprun_el: ET.Element) -> Dict: - nmap_data = dict() - nmaprun = dict(nmaprun_el.attrib) - nmap_data["command_line"] = nmaprun["args"] - nmap_data["started_at"] = datetime.fromtimestamp( - float(nmaprun["start"]), tz=timezone.utc - ) - nmap_data["version"] = nmaprun["version"] - nmap_data["xmloutputversion"] = nmaprun["xmloutputversion"] - return nmap_data - - @classmethod - def _parse_xml_host(cls, host_el: ET.Element) -> Dict: - """ - Receives a XML tag representing a scanned host with - its services. - """ - data = dict() - - # - status_el = host_el.find("status") - data["host_state"] = NmapHostState(status_el.attrib["state"]) - data["host_state_reason"] = NmapHostStatusReason(status_el.attrib["reason"]) - host_state_reason_ttl = status_el.attrib.get("reason_ttl") - if host_state_reason_ttl: - data["host_state_reason_ttl"] = int(host_state_reason_ttl) - - #
- address_el = host_el.find("address") - data["target_ip"] = address_el.attrib["addr"] - - data["hostnames"] = cls._parse_hostnames(host_el.find("hostnames")) - - data["ports"], data["port_stats"] = cls._parse_xml_ports(host_el.find("ports")) - - uptime = host_el.find("uptime") - if uptime is not None: - data["uptime_seconds"] = int(uptime.attrib["seconds"]) - - distance = host_el.find("distance") - if distance is not None: - data["distance"] = int(distance.attrib["value"]) - - tcpsequence = host_el.find("tcpsequence") - if tcpsequence is not None: - data["tcp_sequence_index"] = int(tcpsequence.attrib["index"]) - data["tcp_sequence_difficulty"] = tcpsequence.attrib["difficulty"] - ipidsequence = host_el.find("ipidsequence") - if ipidsequence is not None: - data["ipid_sequence_class"] = ipidsequence.attrib["class"] - tcptssequence = host_el.find("tcptssequence") - if tcptssequence is not None: - data["tcp_timestamp_class"] = tcptssequence.attrib["class"] - - times_elem = host_el.find("times") - if times_elem is not None: - data.update( - { - "srtt_us": int(times_elem.attrib.get("srtt", 0)) or None, - "rttvar_us": int(times_elem.attrib.get("rttvar", 0)) or None, - "timeout_us": int(times_elem.attrib.get("to", 0)) or None, - } - ) - - hostscripts_el = host_el.find("hostscript") - if hostscripts_el is not None: - data["host_scripts"] = [ - NmapHostScript(id=el.attrib["id"], output=el.attrib.get("output")) - for el in hostscripts_el.findall("script") - ] - - data["os_matches"] = cls._parse_os_matches(host_el) - - data["trace"] = cls._parse_trace(host_el) - - return data - - @classmethod - def _parse_os_matches(cls, host_el: ET.Element) -> List[NmapOSMatch] | None: - os_elem = host_el.find("os") - if os_elem is None: - return None - - matches: List[NmapOSMatch] = [] - - for m in os_elem.findall("osmatch"): - classes: List[NmapOSClass] = [] - - for c in m.findall("osclass"): - cpes = [e.text.strip() for e in c.findall("cpe") if e.text] - - classes.append( - NmapOSClass( - vendor=c.attrib.get("vendor"), - osfamily=c.attrib.get("osfamily"), - osgen=c.attrib.get("osgen"), - accuracy=( - int(c.attrib["accuracy"]) - if "accuracy" in c.attrib - else None - ), - cpe=cpes or None, - ) - ) - - matches.append( - NmapOSMatch( - name=m.attrib["name"], - accuracy=int(m.attrib["accuracy"]), - classes=classes, - ) - ) - - return matches or None - - @classmethod - def _parse_hostnames(cls, hostnames_el: ET.Element) -> List[NmapHostname]: - """ - Parses the hostnames element. - e.g. - - - """ - return [ - cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname") - ] - - @classmethod - def _parse_hostname(cls, hostname_el: ET.Element) -> NmapHostname: - """ - Parses the hostname element. - e.g. - - :param hostname_el: XML tag from a nmap scan - """ - return NmapHostname.model_validate(dict(hostname_el.attrib)) - - @classmethod - def _parse_xml_ports( - cls, ports_elem: ET.Element - ) -> Tuple[List[NmapPort], NmapPortStats]: - """ - Parses the list of scanned services from a targeted host. - """ - ports: List[NmapPort] = [] - stats = NmapPortStats() - - # handle extraports first - for e in ports_elem.findall("extraports"): - state = PortState(e.attrib["state"]) - count = int(e.attrib["count"]) - - key = state.value.replace("|", "_") - setattr(stats, key, getattr(stats, key) + count) - - for port_elem in ports_elem.findall("port"): - port = cls._parse_xml_port(port_elem) - ports.append(port) - key = port.state.value.replace("|", "_") - setattr(stats, key, getattr(stats, key) + 1) - return ports, stats - - @classmethod - def _parse_xml_service(cls, service_elem: ET.Element) -> NmapService: - svc = { - "name": service_elem.attrib.get("name"), - "product": service_elem.attrib.get("product"), - "version": service_elem.attrib.get("version"), - "extrainfo": service_elem.attrib.get("extrainfo"), - "method": service_elem.attrib.get("method"), - "conf": ( - int(service_elem.attrib["conf"]) - if "conf" in service_elem.attrib - else None - ), - "cpe": [e.text.strip() for e in service_elem.findall("cpe")], - } - - return NmapService.model_validate(svc) - - @classmethod - def _parse_xml_script(cls, script_elem: ET.Element) -> NmapScript: - output = script_elem.attrib.get("output") - if output: - output = output.strip() - script = { - "id": script_elem.attrib["id"], - "output": output, - } - - elements: Dict[str, Any] = {} - - # handle value - for elem in script_elem.findall(".//elem"): - key = elem.attrib.get("key") - if key: - elements[key.strip()] = elem.text.strip() - - script["elements"] = elements - return NmapScript.model_validate(script) - - @classmethod - def _parse_xml_port(cls, port_elem: ET.Element) -> NmapPort: - """ - - - - - - """ - state_elem = port_elem.find("state") - - port = { - "port": int(port_elem.attrib["portid"]), - "protocol": port_elem.attrib["protocol"], - "state": PortState(state_elem.attrib["state"]), - "reason": ( - PortStateReason(state_elem.attrib["reason"]) - if "reason" in state_elem.attrib - else None - ), - "reason_ttl": ( - int(state_elem.attrib["reason_ttl"]) - if "reason_ttl" in state_elem.attrib - else None - ), - } - - service_elem = port_elem.find("service") - if service_elem is not None: - port["service"] = cls._parse_xml_service(service_elem) - - port["scripts"] = [] - for script_elem in port_elem.findall("script"): - port["scripts"].append(cls._parse_xml_script(script_elem)) - - return NmapPort.model_validate(port) - - @classmethod - def _parse_trace(cls, host_elem: ET.Element) -> Optional[NmapTrace]: - trace_elem = host_elem.find("trace") - if trace_elem is None: - return None - - port_attr = trace_elem.attrib.get("port") - proto_attr = trace_elem.attrib.get("proto") - - hops: List[NmapTraceHop] = [] - - for hop_elem in trace_elem.findall("hop"): - ttl = hop_elem.attrib.get("ttl") - if ttl is None: - continue # ttl is required by the DTD but guard anyway - - rtt = hop_elem.attrib.get("rtt") - ipaddr = hop_elem.attrib.get("ipaddr") - host = hop_elem.attrib.get("host") - - hops.append( - NmapTraceHop( - ttl=int(ttl), - ipaddr=ipaddr, - rtt_ms=float(rtt) if rtt is not None else None, - host=host, - ) - ) - - return NmapTrace( - port=int(port_attr) if port_attr is not None else None, - protocol=IPProtocol(proto_attr) if proto_attr is not None else None, - hops=hops, - ) diff --git a/test_utils/conftest.py b/test_utils/conftest.py index 187ff58..0e712bb 100644 --- a/test_utils/conftest.py +++ b/test_utils/conftest.py @@ -4,6 +4,7 @@ from os.path import join as pjoin from pathlib import Path from typing import TYPE_CHECKING, Callable from uuid import uuid4 +from datetime import datetime, timedelta, timezone import pytest import redis @@ -17,8 +18,6 @@ from generalresearch.redis_helper import RedisConfig from generalresearch.sql_helper import SqlHelper if TYPE_CHECKING: - from datetime import datetime - from generalresearch.config import GRLBaseSettings from generalresearch.currency import USDCent from generalresearch.models.thl.session import Status @@ -38,6 +37,7 @@ def env_file_path(pytestconfig: Config) -> str: @pytest.fixture(scope="session") def settings(env_file_path: str) -> "GRLBaseSettings": from generalresearch.config import GRLBaseSettings + print(f"{env_file_path=}") s = GRLBaseSettings(_env_file=env_file_path) @@ -203,16 +203,12 @@ def wall_status(request) -> "Status": @pytest.fixture -def utc_now() -> "datetime": - from datetime import datetime, timezone - +def utc_now() -> datetime: return datetime.now(tz=timezone.utc) @pytest.fixture -def utc_hour_ago() -> "datetime": - from datetime import datetime, timedelta, timezone - +def utc_hour_ago() -> datetime: return datetime.now(tz=timezone.utc) - timedelta(hours=1) diff --git a/test_utils/managers/network/conftest.py b/test_utils/managers/network/conftest.py index 70fda4e..f6a4078 100644 --- a/test_utils/managers/network/conftest.py +++ b/test_utils/managers/network/conftest.py @@ -1,40 +1,31 @@ import os -from datetime import datetime, timezone -from typing import Callable, TYPE_CHECKING +from datetime import timedelta, datetime, timezone from uuid import uuid4 import pytest from generalresearch.managers.network.label import IPLabelManager -from generalresearch.managers.network.nmap import NmapManager from generalresearch.managers.network.tool_run import ToolRunManager -from generalresearch.models.network.rdns import ( - RDNSResult, - get_dig_version, - get_dig_rdns_command, -) -from generalresearch.models.network.tool_run import ( - RDnsRun, - ToolName, - ToolClass, - Status, -) -from generalresearch.models.network.tool_utils import ToolRunCommand -from generalresearch.models.network.xml_parser import NmapXmlParser +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.mtr.command import build_mtr_command +from generalresearch.models.network.mtr.parser import parse_mtr_output +from generalresearch.models.network.nmap.parser import parse_nmap_xml +from generalresearch.models.network.rdns.command import build_rdns_command +from generalresearch.models.network.rdns.parser import parse_rdns_output +from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun +from generalresearch.models.network.tool_run_command import ToolRunCommand @pytest.fixture(scope="session") -def iplabel_manager(thl_web_rw) -> IPLabelManager: - assert "/unittest-" in thl_web_rw.dsn.path - - return IPLabelManager(pg_config=thl_web_rw) +def scan_group_id(): + return uuid4().hex @pytest.fixture(scope="session") -def nmap_manager(thl_web_rw) -> NmapManager: +def iplabel_manager(thl_web_rw) -> IPLabelManager: assert "/unittest-" in thl_web_rw.dsn.path - return NmapManager(pg_config=thl_web_rw) + return IPLabelManager(pg_config=thl_web_rw) @pytest.fixture(scope="session") @@ -45,7 +36,7 @@ def toolrun_manager(thl_web_rw) -> ToolRunManager: @pytest.fixture(scope="session") -def nmap_xml_str(request) -> str: +def nmap_raw_output(request) -> str: fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml") with open(fp, "r") as f: data = f.read() @@ -53,34 +44,84 @@ def nmap_xml_str(request) -> str: @pytest.fixture(scope="session") -def nmap_run(nmap_xml_str): - return NmapXmlParser.parse_xml(nmap_xml_str) +def nmap_result(nmap_raw_output): + return parse_nmap_xml(nmap_raw_output) @pytest.fixture(scope="session") -def raw_dig_output(): +def nmap_run(nmap_result, scan_group_id): + r = nmap_result + return NmapRun( + tool_version=r.version, + status=Status.SUCCESS, + ip=r.target_ip, + started_at=r.started_at, + finished_at=r.finished_at, + raw_command=r.command_line, + scan_group_id=scan_group_id, + config=ToolRunCommand(command="nmap"), + parsed=r, + ) + + +@pytest.fixture(scope="session") +def dig_raw_output(): return "156.32.33.45.in-addr.arpa. 300 IN PTR scanme.nmap.org." @pytest.fixture(scope="session") -def reverse_dns_run(raw_dig_output): +def rdns_result(dig_raw_output): + return parse_rdns_output(ip="45.33.32.156", raw=dig_raw_output) + + +@pytest.fixture(scope="session") +def rdns_run(rdns_result, scan_group_id): + r = rdns_result ip = "45.33.32.156" - rdns_result = RDNSResult.from_dig(ip=ip, raw_output=raw_dig_output) - scan_group_id = uuid4().hex - started_at = datetime.now(tz=timezone.utc) - tool_version = get_dig_version() - finished_at = datetime.now(tz=timezone.utc) - raw_command = get_dig_rdns_command(ip) - return RDnsRun( - tool_name=ToolName.DIG, - tool_class=ToolClass.RDNS, - tool_version=tool_version, + utc_now = datetime.now(tz=timezone.utc) + return RDNSRun( + tool_version="1.2.3", status=Status.SUCCESS, ip=ip, - started_at=started_at, - finished_at=finished_at, - raw_command=raw_command, - scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand.from_raw_command(raw_command), - parsed=rdns_result, + started_at=utc_now, + finished_at=utc_now + timedelta(seconds=1), + raw_command=build_rdns_command(ip=ip), + scan_group_id=scan_group_id, + config=ToolRunCommand(command="dig"), + parsed=r, + ) + + +@pytest.fixture(scope="session") +def mtr_raw_output(request): + fp = os.path.join(request.config.rootpath, "data/mtr_fatbeam.json") + with open(fp, "r") as f: + data = f.read() + return data + + +@pytest.fixture(scope="session") +def mtr_result(mtr_raw_output): + return parse_mtr_output(mtr_raw_output, port=443, protocol=IPProtocol.TCP) + + +@pytest.fixture(scope="session") +def mtr_run(mtr_result, scan_group_id): + r = mtr_result + utc_now = datetime.now(tz=timezone.utc) + + return MTRRun( + tool_version="1.2.3", + status=Status.SUCCESS, + ip=r.destination, + started_at=utc_now, + finished_at=utc_now + timedelta(seconds=1), + raw_command=build_mtr_command( + ip=r.destination, protocol=IPProtocol.TCP, port=443 + ), + scan_group_id=scan_group_id, + config=ToolRunCommand(command="mtr"), + parsed=r, + facility_id=1, + source_ip="1.2.3.4", ) diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py index 0f9388f..c05af92 100644 --- a/tests/managers/network/tool_run.py +++ b/tests/managers/network/tool_run.py @@ -6,102 +6,38 @@ import faker import pytest from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.mtr import ( - get_mtr_version, - parse_raw_output, - MTRReport, - get_mtr_command, -) + from generalresearch.models.network.tool_run import ( - new_tool_run_from_nmap, - run_dig, - MtrRun, ToolName, ToolClass, Status, ) -from generalresearch.models.network.tool_utils import ToolRunCommand fake = faker.Faker() -def test_create_tool_run_from_nmap(nmap_run, toolrun_manager): - scan_group_id = uuid4().hex - run = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id) - - toolrun_manager.create_portscan_run(run) - - run_out = toolrun_manager.get_portscan_run(run.id) - - assert run == run_out - - -def test_create_tool_run_from_dig_fixture(reverse_dns_run, toolrun_manager): - - toolrun_manager.create_rdns_run(reverse_dns_run) - - run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) - - assert reverse_dns_run == run_out - - -def test_run_dig(toolrun_manager): - reverse_dns_run = run_dig(ip="65.19.129.53") - - toolrun_manager.create_rdns_run(reverse_dns_run) - - run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) - - assert reverse_dns_run == run_out - - -def test_run_dig_empty(toolrun_manager): - reverse_dns_run = run_dig(ip=fake.ipv6()) +def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): - toolrun_manager.create_rdns_run(reverse_dns_run) + toolrun_manager.create_nmap_run(nmap_run) - run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) + run_out = toolrun_manager.get_nmap_run(nmap_run.id) - assert reverse_dns_run == run_out + assert nmap_run == run_out -@pytest.fixture(scope="session") -def mtr_report(request) -> MTRReport: - fp = os.path.join(request.config.rootpath, "data/mtr_fatbeam.json") - with open(fp, "r") as f: - s = f.read() - data = parse_raw_output(s) - data["port"] = 443 - data["protocol"] = IPProtocol.TCP - return MTRReport.model_validate(data) +def test_create_tool_run_from_rdns_run(rdns_run, toolrun_manager): + toolrun_manager.create_rdns_run(rdns_run) -def test_create_tool_run_from_mtr(toolrun_manager, mtr_report): - started_at = datetime.now(tz=timezone.utc) - tool_version = get_mtr_version() + run_out = toolrun_manager.get_rdns_run(rdns_run.id) - ip = mtr_report.destination + assert rdns_run == run_out - finished_at = datetime.now(tz=timezone.utc) - raw_command = " ".join(get_mtr_command(ip)) - run = MtrRun( - tool_name=ToolName.MTR, - tool_class=ToolClass.TRACEROUTE, - tool_version=tool_version, - status=Status.SUCCESS, - ip=ip, - started_at=started_at, - finished_at=finished_at, - raw_command=raw_command, - scan_group_id=uuid4().hex, - config=ToolRunCommand.from_raw_command(raw_command), - parsed=mtr_report, - source_ip="1.1.1.1" - ) +def test_create_tool_run_from_mtr_run(mtr_run, toolrun_manager): - toolrun_manager.create_mtr_run(run) + toolrun_manager.create_mtr_run(mtr_run) - run_out = toolrun_manager.get_mtr_run(run.id) + run_out = toolrun_manager.get_mtr_run(mtr_run.id) - assert run == run_out + assert mtr_run == run_out diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py deleted file mode 100644 index 4fc7014..0000000 --- a/tests/models/network/nmap.py +++ /dev/null @@ -1,32 +0,0 @@ -import os - -import pytest - -from generalresearch.models.network.xml_parser import NmapXmlParser - - -@pytest.fixture -def nmap_xml_str(request) -> str: - fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml") - with open(fp, "r") as f: - data = f.read() - return data - - -@pytest.fixture -def nmap_xml_str2(request) -> str: - fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml") - with open(fp, "r") as f: - data = f.read() - return data - - -def test_nmap_xml_parser(nmap_xml_str, nmap_xml_str2): - p = NmapXmlParser() - n = p.parse_xml(nmap_xml_str) - assert n.tcp_open_ports == [61232] - assert len(n.trace.hops) == 18 - - n = p.parse_xml(nmap_xml_str2) - assert n.tcp_open_ports == [22, 80, 9929, 31337] - assert n.trace is None diff --git a/tests/models/network/nmap_parser.py b/tests/models/network/nmap_parser.py new file mode 100644 index 0000000..96d7b37 --- /dev/null +++ b/tests/models/network/nmap_parser.py @@ -0,0 +1,22 @@ +import os + +import pytest + +from generalresearch.models.network.nmap.parser import parse_nmap_xml + +@pytest.fixture +def nmap_raw_output_2(request) -> str: + fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml") + with open(fp, "r") as f: + data = f.read() + return data + + +def test_nmap_xml_parser(nmap_raw_output, nmap_raw_output_2): + n = parse_nmap_xml(nmap_raw_output) + assert n.tcp_open_ports == [61232] + assert len(n.trace.hops) == 18 + + n = parse_nmap_xml(nmap_raw_output_2) + assert n.tcp_open_ports == [22, 80, 9929, 31337] + assert n.trace is None diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py index 9167749..64e8351 100644 --- a/tests/models/network/rdns.py +++ b/tests/models/network/rdns.py @@ -1,23 +1,45 @@ -from generalresearch.models.network.rdns import dig_rdns -import faker +# from generalresearch.models.network.rdns import run_rdns +# import faker +# +# fake = faker.Faker() +# +# +# def test_dig_rdns(): +# # Actually runs dig -x. Idk how stable this is +# ip = "45.33.32.156" +# rdns_result = run_rdns(ip) +# assert rdns_result.primary_hostname == "scanme.nmap.org" +# assert rdns_result.primary_org == "nmap" +# +# ip = "65.19.129.53" +# rdns_result = run_rdns(ip) +# assert rdns_result.primary_hostname == "in1-smtp.grlengine.com" +# assert rdns_result.primary_org == "grlengine" +# +# ip = fake.ipv6() +# rdns_result = run_rdns(ip) +# assert rdns_result.primary_hostname is None +# assert rdns_result.primary_org is None +# print(rdns_result.model_dump_postgres()) -fake = faker.Faker() - -def test_dig_rdns(): - # Actually runs dig -x. Idk how stable this is - ip = "45.33.32.156" - rdns_result = dig_rdns(ip) - assert rdns_result.primary_hostname == "scanme.nmap.org" - assert rdns_result.primary_org == "nmap" - - ip = "65.19.129.53" - rdns_result = dig_rdns(ip) - assert rdns_result.primary_hostname == "in1-smtp.grlengine.com" - assert rdns_result.primary_org == "grlengine" - - ip = fake.ipv6() - rdns_result = dig_rdns(ip) - assert rdns_result.primary_hostname is None - assert rdns_result.primary_org is None - print(rdns_result.model_dump_postgres()) +# +# +# def test_run_dig(toolrun_manager): +# reverse_dns_run = run_dig(ip="65.19.129.53") +# +# toolrun_manager.create_rdns_run(reverse_dns_run) +# +# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) +# +# assert reverse_dns_run == run_out +# +# +# def test_run_dig_empty(toolrun_manager): +# reverse_dns_run = run_dig(ip=fake.ipv6()) +# +# toolrun_manager.create_rdns_run(reverse_dns_run) +# +# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) +# +# assert reverse_dns_run == run_out \ No newline at end of file diff --git a/tests/models/network/tool_run.py b/tests/models/network/tool_run.py deleted file mode 100644 index c643503..0000000 --- a/tests/models/network/tool_run.py +++ /dev/null @@ -1,8 +0,0 @@ -from uuid import uuid4 - -from generalresearch.models.network.tool_run import new_tool_run_from_nmap - - -def test_new_tool_run_from_nmap(nmap_run): - scan_group_id = uuid4().hex - run, scan = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id) -- cgit v1.2.3 From d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4 Mon Sep 17 00:00:00 2001 From: stuppie Date: Thu, 12 Mar 2026 11:53:18 -0600 Subject: some tests to execute the full Toolrun pipeline for each tool. fix small bugs --- generalresearch/models/network/mtr/execute.py | 4 +- generalresearch/models/network/mtr/parser.py | 3 +- generalresearch/models/network/mtr/result.py | 4 +- generalresearch/models/network/nmap/execute.py | 2 +- generalresearch/models/network/tool_run_command.py | 4 +- tests/managers/network/tool_run.py | 18 ----- tests/models/network/mtr.py | 26 ++++++++ tests/models/network/nmap.py | 29 ++++++++ tests/models/network/rdns.py | 78 +++++++++------------- 9 files changed, 97 insertions(+), 71 deletions(-) create mode 100644 tests/models/network/mtr.py create mode 100644 tests/models/network/nmap.py (limited to 'tests/managers/network') diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py index bd556bc..81de24f 100644 --- a/generalresearch/models/network/mtr/execute.py +++ b/generalresearch/models/network/mtr/execute.py @@ -29,8 +29,8 @@ def execute_mtr( config = ToolRunCommand( command="mtr", options={ - "protocol": protocol, - "port": port, + "protocol": result.protocol, + "port": result.port, "report_cycles": report_cycles, }, ) diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py index dc108d9..fb0ca61 100644 --- a/generalresearch/models/network/mtr/parser.py +++ b/generalresearch/models/network/mtr/parser.py @@ -1,13 +1,14 @@ import json from typing import Dict +from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.result import MTRResult def parse_mtr_output(raw: str, port, protocol) -> MTRResult: data = parse_mtr_raw_output(raw) data["port"] = port - data["protocol"] = protocol + data["protocol"] = protocol or IPProtocol.ICMP return MTRResult.model_validate(data) diff --git a/generalresearch/models/network/mtr/result.py b/generalresearch/models/network/mtr/result.py index 62f92ab..5c775b4 100644 --- a/generalresearch/models/network/mtr/result.py +++ b/generalresearch/models/network/mtr/result.py @@ -118,9 +118,9 @@ class MTRResult(BaseModel): bitpattern: str = Field(description="Payload byte pattern used in probes (hex).") # Protocol used for the traceroute - protocol: IPProtocol = Field() + protocol: IPProtocol = Field(default=IPProtocol.ICMP) # The target port number for TCP/SCTP/UDP traces - port: Optional[int] = Field() + port: Optional[int] = Field(default=None) hops: List[MTRHop] = Field() diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py index fc1e2fa..68a9926 100644 --- a/generalresearch/models/network/nmap/execute.py +++ b/generalresearch/models/network/nmap/execute.py @@ -12,7 +12,7 @@ def execute_nmap( ): result = run_nmap(ip=ip, top_ports=top_ports) assert result.exit_status == "success" - assert result.target_ip == ip + assert result.target_ip == ip, f"{result.target_ip=}, {ip=}" run = NmapRun( tool_name=ToolName.NMAP, diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py index e3d94df..5abe670 100644 --- a/generalresearch/models/network/tool_run_command.py +++ b/generalresearch/models/network/tool_run_command.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Dict, Optional from pydantic import BaseModel, Field @@ -6,4 +6,4 @@ from pydantic import BaseModel, Field class ToolRunCommand(BaseModel): # todo: expand with arguments specific for each tool command: str = Field() - options: Dict[str, str | int] = Field(default_factory=dict) + options: Dict[str, Optional[str | int]] = Field(default_factory=dict) diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py index c05af92..a815809 100644 --- a/tests/managers/network/tool_run.py +++ b/tests/managers/network/tool_run.py @@ -1,21 +1,3 @@ -import os -from datetime import datetime, timezone -from uuid import uuid4 - -import faker -import pytest - -from generalresearch.models.network.definitions import IPProtocol - -from generalresearch.models.network.tool_run import ( - ToolName, - ToolClass, - Status, -) - -fake = faker.Faker() - - def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): toolrun_manager.create_nmap_run(nmap_run) diff --git a/tests/models/network/mtr.py b/tests/models/network/mtr.py new file mode 100644 index 0000000..2965300 --- /dev/null +++ b/tests/models/network/mtr.py @@ -0,0 +1,26 @@ +from generalresearch.models.network.mtr.execute import execute_mtr +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_mtr(toolrun_manager): + ip = "65.19.129.53" + + run = execute_mtr(ip=ip, report_cycles=3) + assert run.tool_name == ToolName.MTR + assert run.tool_class == ToolClass.TRACEROUTE + assert run.ip == ip + result = run.parsed + + last_hop = result.hops[-1] + assert last_hop.asn == 6939 + assert last_hop.domain == "grlengine.com" + + last_hop_1 = result.hops[-2] + assert last_hop_1.asn == 6939 + assert last_hop_1.domain == "he.net" + + toolrun_manager.create_mtr_run(run) diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py new file mode 100644 index 0000000..f034bf0 --- /dev/null +++ b/tests/models/network/nmap.py @@ -0,0 +1,29 @@ +import subprocess + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.nmap.execute import execute_nmap +import faker + +from generalresearch.models.network.nmap.result import PortState +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def resolve(host): + return subprocess.check_output(["dig", host, "+short"]).decode().strip() + + +def test_execute_nmap_scanme(toolrun_manager): + ip = resolve("scanme.nmap.org") + + run = execute_nmap(ip=ip, top_ports=20) + assert run.tool_name == ToolName.NMAP + assert run.tool_class == ToolClass.PORT_SCAN + assert run.ip == ip + result = run.parsed + + port22 = result._port_index[(IPProtocol.TCP, 22)] + assert port22.state == PortState.OPEN + + toolrun_manager.create_nmap_run(run) diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py index 64e8351..e56c494 100644 --- a/tests/models/network/rdns.py +++ b/tests/models/network/rdns.py @@ -1,45 +1,33 @@ -# from generalresearch.models.network.rdns import run_rdns -# import faker -# -# fake = faker.Faker() -# -# -# def test_dig_rdns(): -# # Actually runs dig -x. Idk how stable this is -# ip = "45.33.32.156" -# rdns_result = run_rdns(ip) -# assert rdns_result.primary_hostname == "scanme.nmap.org" -# assert rdns_result.primary_org == "nmap" -# -# ip = "65.19.129.53" -# rdns_result = run_rdns(ip) -# assert rdns_result.primary_hostname == "in1-smtp.grlengine.com" -# assert rdns_result.primary_org == "grlengine" -# -# ip = fake.ipv6() -# rdns_result = run_rdns(ip) -# assert rdns_result.primary_hostname is None -# assert rdns_result.primary_org is None -# print(rdns_result.model_dump_postgres()) - - -# -# -# def test_run_dig(toolrun_manager): -# reverse_dns_run = run_dig(ip="65.19.129.53") -# -# toolrun_manager.create_rdns_run(reverse_dns_run) -# -# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) -# -# assert reverse_dns_run == run_out -# -# -# def test_run_dig_empty(toolrun_manager): -# reverse_dns_run = run_dig(ip=fake.ipv6()) -# -# toolrun_manager.create_rdns_run(reverse_dns_run) -# -# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) -# -# assert reverse_dns_run == run_out \ No newline at end of file +from generalresearch.models.network.rdns.execute import execute_rdns +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_rdns_grl(toolrun_manager): + ip = "65.19.129.53" + run = execute_rdns(ip=ip) + assert run.tool_name == ToolName.DIG + assert run.tool_class == ToolClass.RDNS + assert run.ip == ip + result = run.parsed + assert result.primary_hostname == "in1-smtp.grlengine.com" + assert result.primary_domain == "grlengine.com" + assert result.hostname_count == 1 + + toolrun_manager.create_rdns_run(run) + + +def test_execute_rdns_none(toolrun_manager): + ip = fake.ipv6() + run = execute_rdns(ip) + result = run.parsed + + assert result.primary_hostname is None + assert result.primary_domain is None + assert result.hostname_count == 0 + assert result.hostnames == [] + + toolrun_manager.create_rdns_run(run) -- cgit v1.2.3 From efe1c368b8b49a2c6b3bf2193a5b89eb5426eba3 Mon Sep 17 00:00:00 2001 From: stuppie Date: Fri, 13 Mar 2026 13:31:22 -0600 Subject: ToolRunCommand + options models to handle cmd line args for all network tools --- generalresearch/models/network/mtr/command.py | 17 +- generalresearch/models/network/mtr/execute.py | 29 +-- generalresearch/models/network/mtr/parser.py | 4 +- generalresearch/models/network/nmap/command.py | 34 +++- generalresearch/models/network/nmap/execute.py | 30 ++- generalresearch/models/network/rdns/command.py | 10 +- generalresearch/models/network/rdns/execute.py | 13 +- generalresearch/models/network/rdns/parser.py | 3 +- generalresearch/models/network/tool_run.py | 10 +- generalresearch/models/network/tool_run_command.py | 59 +++++- test_utils/managers/network/conftest.py | 38 ++-- tests/managers/network/label.py | 202 --------------------- tests/managers/network/test_label.py | 202 +++++++++++++++++++++ tests/managers/network/test_tool_run.py | 25 +++ tests/managers/network/tool_run.py | 25 --- tests/models/network/mtr.py | 26 --- tests/models/network/nmap.py | 29 --- tests/models/network/nmap_parser.py | 22 --- tests/models/network/rdns.py | 33 ---- tests/models/network/test_mtr.py | 26 +++ tests/models/network/test_nmap.py | 29 +++ tests/models/network/test_nmap_parser.py | 22 +++ tests/models/network/test_rdns.py | 33 ++++ 23 files changed, 523 insertions(+), 398 deletions(-) delete mode 100644 tests/managers/network/label.py create mode 100644 tests/managers/network/test_label.py create mode 100644 tests/managers/network/test_tool_run.py delete mode 100644 tests/managers/network/tool_run.py delete mode 100644 tests/models/network/mtr.py delete mode 100644 tests/models/network/nmap.py delete mode 100644 tests/models/network/nmap_parser.py delete mode 100644 tests/models/network/rdns.py create mode 100644 tests/models/network/test_mtr.py create mode 100644 tests/models/network/test_nmap.py create mode 100644 tests/models/network/test_nmap_parser.py create mode 100644 tests/models/network/test_rdns.py (limited to 'tests/managers/network') diff --git a/generalresearch/models/network/mtr/command.py b/generalresearch/models/network/mtr/command.py index e3ab903..f8d2d49 100644 --- a/generalresearch/models/network/mtr/command.py +++ b/generalresearch/models/network/mtr/command.py @@ -4,6 +4,7 @@ from typing import List, Optional from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.parser import parse_mtr_output from generalresearch.models.network.mtr.result import MTRResult +from generalresearch.models.network.tool_run_command import MTRRunCommand SUPPORTED_PROTOCOLS = { IPProtocol.TCP, @@ -56,20 +57,14 @@ def get_mtr_version() -> str: return ver_str.split(" ", 1)[1] -def run_mtr( - ip: str, - protocol: Optional[IPProtocol] = None, - port: Optional[int] = None, - report_cycles: int = 10, -) -> MTRResult: - args = build_mtr_command( - ip=ip, protocol=protocol, port=port, report_cycles=report_cycles - ) +def run_mtr(config: MTRRunCommand) -> MTRResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( - args.split(" "), + args, capture_output=True, text=True, check=False, ) raw = proc.stdout.strip() - return parse_mtr_output(raw, protocol=protocol, port=port) + return parse_mtr_output(raw, protocol=config.options.protocol, port=config.options.port) diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py index 81de24f..a6fb82a 100644 --- a/generalresearch/models/network/mtr/execute.py +++ b/generalresearch/models/network/mtr/execute.py @@ -10,30 +10,33 @@ from generalresearch.models.network.mtr.command import ( build_mtr_command, ) from generalresearch.models.network.tool_run import MTRRun, ToolName, ToolClass, Status -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + MTRRunCommand, + MTRRunCommandOptions, +) from generalresearch.models.network.utils import get_source_ip def execute_mtr( ip: str, scan_group_id: Optional[UUIDStr] = None, - protocol: Optional[IPProtocol] = None, + protocol: Optional[IPProtocol] = IPProtocol.ICMP, port: Optional[int] = None, report_cycles: int = 10, ) -> MTRRun: + config = MTRRunCommand( + options=MTRRunCommandOptions( + ip=ip, + report_cycles=report_cycles, + protocol=protocol, + port=port, + ), + ) + started_at = datetime.now(tz=timezone.utc) tool_version = get_mtr_version() - result = run_mtr(ip, protocol=protocol, port=port, report_cycles=report_cycles) + result = run_mtr(config) finished_at = datetime.now(tz=timezone.utc) - raw_command = build_mtr_command(ip) - config = ToolRunCommand( - command="mtr", - options={ - "protocol": result.protocol, - "port": result.port, - "report_cycles": report_cycles, - }, - ) return MTRRun( tool_name=ToolName.MTR, @@ -43,7 +46,7 @@ def execute_mtr( ip=ip, started_at=started_at, finished_at=finished_at, - raw_command=raw_command, + raw_command=config.to_command_str(), scan_group_id=scan_group_id or uuid4().hex, config=config, parsed=result, diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py index fb0ca61..685eeca 100644 --- a/generalresearch/models/network/mtr/parser.py +++ b/generalresearch/models/network/mtr/parser.py @@ -5,10 +5,10 @@ from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.result import MTRResult -def parse_mtr_output(raw: str, port, protocol) -> MTRResult: +def parse_mtr_output(raw: str, port: int, protocol: IPProtocol) -> MTRResult: data = parse_mtr_raw_output(raw) data["port"] = port - data["protocol"] = protocol or IPProtocol.ICMP + data["protocol"] = protocol return MTRResult.model_validate(data) diff --git a/generalresearch/models/network/nmap/command.py b/generalresearch/models/network/nmap/command.py index dfa55de..47e0a87 100644 --- a/generalresearch/models/network/nmap/command.py +++ b/generalresearch/models/network/nmap/command.py @@ -3,18 +3,40 @@ from typing import Optional, List from generalresearch.models.network.nmap.parser import parse_nmap_xml from generalresearch.models.network.nmap.result import NmapResult +from generalresearch.models.network.tool_run_command import NmapRunCommand -def build_nmap_command(ip: str, top_ports: Optional[int] = 1000) -> List[str]: +def build_nmap_command( + ip: str, + no_ping: bool = True, + enable_advanced: bool = True, + timing: int = 4, + ports: Optional[str] = None, + top_ports: Optional[int] = None, +) -> str: # e.g. "nmap -Pn -T4 -A --top-ports 1000 -oX - scanme.nmap.org" # https://linux.die.net/man/1/nmap - args = ["nmap", "-Pn", "-T4", "-A", "--top-ports", str(int(top_ports)), "-oX", "-"] - args.append(ip) - return args + args = ["nmap"] + assert 0 <= timing <= 5 + args.append(f"-T{timing}") + if no_ping: + args.append("-Pn") + if enable_advanced: + args.append("-A") + if ports is not None: + assert top_ports is None + args.extend(["-p", ports]) + if top_ports is not None: + assert ports is None + args.extend(["--top-ports", str(top_ports)]) + args.extend(["-oX", "-", ip]) + return " ".join(args) -def run_nmap(ip: str, top_ports: Optional[int] = 1000) -> NmapResult: - args = build_nmap_command(ip=ip, top_ports=top_ports) + +def run_nmap(config: NmapRunCommand) -> NmapResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( args, capture_output=True, diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py index 68a9926..0334f50 100644 --- a/generalresearch/models/network/nmap/execute.py +++ b/generalresearch/models/network/nmap/execute.py @@ -4,15 +4,35 @@ from uuid import uuid4 from generalresearch.models.custom_types import UUIDStr from generalresearch.models.network.nmap.command import run_nmap from generalresearch.models.network.tool_run import NmapRun, ToolName, ToolClass, Status -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + NmapRunCommand, + NmapRunCommandOptions, +) def execute_nmap( - ip: str, top_ports: Optional[int] = 1000, scan_group_id: Optional[UUIDStr] = None + ip: str, + top_ports: Optional[int] = 1000, + ports: Optional[str] = None, + no_ping: bool = True, + enable_advanced: bool = True, + timing: int = 4, + scan_group_id: Optional[UUIDStr] = None, ): - result = run_nmap(ip=ip, top_ports=top_ports) + config = NmapRunCommand( + options=NmapRunCommandOptions( + top_ports=top_ports, + ports=ports, + no_ping=no_ping, + enable_advanced=enable_advanced, + timing=timing, + ip=ip, + ) + ) + result = run_nmap(config) assert result.exit_status == "success" assert result.target_ip == ip, f"{result.target_ip=}, {ip=}" + assert result.command_line == config.to_command_str() run = NmapRun( tool_name=ToolName.NMAP, @@ -24,7 +44,7 @@ def execute_nmap( finished_at=result.finished_at, raw_command=result.command_line, scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand(command="nmap", options={'top_ports': top_ports}), + config=config, parsed=result, ) - return run \ No newline at end of file + return run diff --git a/generalresearch/models/network/rdns/command.py b/generalresearch/models/network/rdns/command.py index e9d5bfd..aa48f2a 100644 --- a/generalresearch/models/network/rdns/command.py +++ b/generalresearch/models/network/rdns/command.py @@ -2,10 +2,12 @@ import subprocess from generalresearch.models.network.rdns.parser import parse_rdns_output from generalresearch.models.network.rdns.result import RDNSResult +from generalresearch.models.network.tool_run_command import RDNSRunCommand -def run_rdns(ip: str) -> RDNSResult: - args = build_rdns_command(ip).split(" ") +def run_rdns(config: RDNSRunCommand) -> RDNSResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( args, capture_output=True, @@ -13,10 +15,10 @@ def run_rdns(ip: str) -> RDNSResult: check=False, ) raw = proc.stdout.strip() - return parse_rdns_output(ip, raw) + return parse_rdns_output(ip=config.options.ip, raw=raw) -def build_rdns_command(ip: str): +def build_rdns_command(ip: str) -> str: # e.g. dig +noall +answer -x 1.2.3.4 return " ".join(["dig", "+noall", "+answer", "-x", ip]) diff --git a/generalresearch/models/network/rdns/execute.py b/generalresearch/models/network/rdns/execute.py index 97b8bf8..03a5080 100644 --- a/generalresearch/models/network/rdns/execute.py +++ b/generalresearch/models/network/rdns/execute.py @@ -14,15 +14,18 @@ from generalresearch.models.network.tool_run import ( Status, RDNSRun, ) -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + RDNSRunCommand, + RDNSRunCommandOptions, +) def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): started_at = datetime.now(tz=timezone.utc) tool_version = get_dig_version() - result = run_rdns(ip=ip) + config = RDNSRunCommand(options=RDNSRunCommandOptions(ip=ip)) + result = run_rdns(config) finished_at = datetime.now(tz=timezone.utc) - raw_command = build_rdns_command(ip) run = RDNSRun( tool_name=ToolName.DIG, @@ -32,9 +35,9 @@ def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): ip=ip, started_at=started_at, finished_at=finished_at, - raw_command=raw_command, + raw_command=config.to_command_str(), scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand(command="dig", options={}), + config=config, parsed=result, ) diff --git a/generalresearch/models/network/rdns/parser.py b/generalresearch/models/network/rdns/parser.py index f12a6f4..231949e 100644 --- a/generalresearch/models/network/rdns/parser.py +++ b/generalresearch/models/network/rdns/parser.py @@ -2,12 +2,13 @@ import ipaddress import re from typing import List +from generalresearch.models.custom_types import IPvAnyAddressStr from generalresearch.models.network.rdns.result import RDNSResult PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.") -def parse_rdns_output(ip, raw): +def parse_rdns_output(ip: IPvAnyAddressStr, raw:str): hostnames: List[str] = [] for line in raw.splitlines(): diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py index 36e6950..114d4b6 100644 --- a/generalresearch/models/network/tool_run.py +++ b/generalresearch/models/network/tool_run.py @@ -12,7 +12,12 @@ from generalresearch.models.custom_types import ( from generalresearch.models.network.mtr.result import MTRResult from generalresearch.models.network.nmap.result import NmapResult from generalresearch.models.network.rdns.result import RDNSResult -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + ToolRunCommand, + NmapRunCommand, + RDNSRunCommand, + MTRRunCommand, +) class ToolClass(StrEnum): @@ -68,6 +73,7 @@ class ToolRun(BaseModel): class NmapRun(ToolRun): tool_class: Literal[ToolClass.PORT_SCAN] = Field(default=ToolClass.PORT_SCAN) tool_name: Literal[ToolName.NMAP] = Field(default=ToolName.NMAP) + config: NmapRunCommand = Field() parsed: NmapResult = Field() @@ -81,6 +87,7 @@ class NmapRun(ToolRun): class RDNSRun(ToolRun): tool_class: Literal[ToolClass.RDNS] = Field(default=ToolClass.RDNS) tool_name: Literal[ToolName.DIG] = Field(default=ToolName.DIG) + config: RDNSRunCommand = Field() parsed: RDNSResult = Field() @@ -94,6 +101,7 @@ class RDNSRun(ToolRun): class MTRRun(ToolRun): tool_class: Literal[ToolClass.TRACEROUTE] = Field(default=ToolClass.TRACEROUTE) tool_name: Literal[ToolName.MTR] = Field(default=ToolName.MTR) + config: MTRRunCommand = Field() facility_id: int = Field(default=1) source_ip: IPvAnyAddressStr = Field() diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py index 5abe670..68d2070 100644 --- a/generalresearch/models/network/tool_run_command.py +++ b/generalresearch/models/network/tool_run_command.py @@ -1,9 +1,64 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Literal from pydantic import BaseModel, Field +from generalresearch.models.custom_types import IPvAnyAddressStr +from generalresearch.models.network.definitions import IPProtocol + class ToolRunCommand(BaseModel): - # todo: expand with arguments specific for each tool command: str = Field() options: Dict[str, Optional[str | int]] = Field(default_factory=dict) + + +class NmapRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr + top_ports: Optional[int] = Field(default=1000) + ports: Optional[str] = Field(default=None) + no_ping: bool = Field(default=True) + enable_advanced: bool = Field(default=True) + timing: int = Field(default=4) + + +class NmapRunCommand(ToolRunCommand): + command: Literal["nmap"] = Field(default="nmap") + options: NmapRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.nmap.command import build_nmap_command + + options = self.options + return build_nmap_command(**options.model_dump()) + + +class RDNSRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr + + +class RDNSRunCommand(ToolRunCommand): + command: Literal["dig"] = Field(default="dig") + options: RDNSRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.rdns.command import build_rdns_command + + options = self.options + return build_rdns_command(**options.model_dump()) + + +class MTRRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr = Field() + protocol: IPProtocol = Field(default=IPProtocol.ICMP) + port: Optional[int] = Field(default=None) + report_cycles: int = Field(default=10) + + +class MTRRunCommand(ToolRunCommand): + command: Literal["mtr"] = Field(default="mtr") + options: MTRRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.mtr.command import build_mtr_command + + options = self.options + return build_mtr_command(**options.model_dump()) diff --git a/test_utils/managers/network/conftest.py b/test_utils/managers/network/conftest.py index f6a4078..979dd63 100644 --- a/test_utils/managers/network/conftest.py +++ b/test_utils/managers/network/conftest.py @@ -7,13 +7,18 @@ import pytest from generalresearch.managers.network.label import IPLabelManager from generalresearch.managers.network.tool_run import ToolRunManager from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.mtr.command import build_mtr_command from generalresearch.models.network.mtr.parser import parse_mtr_output from generalresearch.models.network.nmap.parser import parse_nmap_xml -from generalresearch.models.network.rdns.command import build_rdns_command from generalresearch.models.network.rdns.parser import parse_rdns_output from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + MTRRunCommand, + MTRRunCommandOptions, + RDNSRunCommand, + NmapRunCommand, + NmapRunCommandOptions, + RDNSRunCommandOptions, +) @pytest.fixture(scope="session") @@ -51,15 +56,21 @@ def nmap_result(nmap_raw_output): @pytest.fixture(scope="session") def nmap_run(nmap_result, scan_group_id): r = nmap_result + config = NmapRunCommand( + command="nmap", + options=NmapRunCommandOptions( + ip=r.target_ip, ports="22-1000,11000,1100,3389,61232", top_ports=None + ), + ) return NmapRun( tool_version=r.version, status=Status.SUCCESS, ip=r.target_ip, started_at=r.started_at, finished_at=r.finished_at, - raw_command=r.command_line, + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="nmap"), + config=config, parsed=r, ) @@ -79,15 +90,16 @@ def rdns_run(rdns_result, scan_group_id): r = rdns_result ip = "45.33.32.156" utc_now = datetime.now(tz=timezone.utc) + config = RDNSRunCommand(command="dig", options=RDNSRunCommandOptions(ip=ip)) return RDNSRun( tool_version="1.2.3", status=Status.SUCCESS, ip=ip, started_at=utc_now, finished_at=utc_now + timedelta(seconds=1), - raw_command=build_rdns_command(ip=ip), + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="dig"), + config=config, parsed=r, ) @@ -109,6 +121,12 @@ def mtr_result(mtr_raw_output): def mtr_run(mtr_result, scan_group_id): r = mtr_result utc_now = datetime.now(tz=timezone.utc) + config = MTRRunCommand( + command="mtr", + options=MTRRunCommandOptions( + ip=r.destination, protocol=IPProtocol.TCP, port=443 + ), + ) return MTRRun( tool_version="1.2.3", @@ -116,11 +134,9 @@ def mtr_run(mtr_result, scan_group_id): ip=r.destination, started_at=utc_now, finished_at=utc_now + timedelta(seconds=1), - raw_command=build_mtr_command( - ip=r.destination, protocol=IPProtocol.TCP, port=443 - ), + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="mtr"), + config=config, parsed=r, facility_id=1, source_ip="1.2.3.4", diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py deleted file mode 100644 index 5b9a790..0000000 --- a/tests/managers/network/label.py +++ /dev/null @@ -1,202 +0,0 @@ -import ipaddress - -import faker -import pytest -from psycopg.errors import UniqueViolation -from pydantic import ValidationError - -from generalresearch.managers.network.label import IPLabelManager -from generalresearch.models.network.label import ( - IPLabel, - IPLabelKind, - IPLabelSource, - IPLabelMetadata, -) -from generalresearch.models.thl.ipinfo import normalize_ip - -fake = faker.Faker() - - -@pytest.fixture -def ip_label(utc_now) -> IPLabel: - ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - return IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - metadata=IPLabelMetadata(services=["RDP"]) - ) - - -def test_model(utc_now): - ip = fake.ipv4_public() - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - assert lbl.ip.prefixlen == 32 - print(f"{lbl.ip=}") - - ip = ipaddress.IPv4Network((ip, 24), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"): - IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=fake.ipv6(), - ) - - ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - -def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel): - iplabel_manager.create(ip_label) - - with pytest.raises( - UniqueViolation, match="duplicate key value violates unique constraint" - ): - iplabel_manager.create(ip_label) - - -def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago): - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 0 - - iplabel_manager.create(ip_label) - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 1 - - out = res[0] - assert out == ip_label - - res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) - assert len(res) == 1 - - ip_label2 = ip_label.model_copy() - ip_label2.ip = fake.ipv4_public() - iplabel_manager.create(ip_label2) - res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) - assert len(res) == 2 - - -def test_filter_network( - iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago -): - print(ip_label) - ip_label = ip_label.model_copy() - ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - - iplabel_manager.create(ip_label) - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 1 - - out = res[0] - assert out == ip_label - - res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) - assert len(res) == 1 - - ip_label2 = ip_label.model_copy() - ip_label2.ip = fake.ipv4_public() - iplabel_manager.create(ip_label2) - res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) - assert len(res) == 2 - - -def test_network(iplabel_manager: IPLabelManager, utc_now): - # This is a fully-specific /128 ipv6 address. - # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d' - ip = fake.ipv6() - # Generally, we'd want to annotate the /64 network - # e.g. '51b7:b38d:8717:6c5b::/64' - ip_64 = ipaddress.IPv6Network((ip, 64), strict=False) - - label = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip_64, - ) - iplabel_manager.create(label) - - # If I query for the /128 directly, I won't find it - res = iplabel_manager.filter(ips=[ip]) - assert len(res) == 0 - - # If I query for the /64 network I will - res = iplabel_manager.filter(ips=[ip_64]) - assert len(res) == 1 - - # Or, I can query for the /128 ip IN a network - res = iplabel_manager.filter(ip_in_network=ip) - assert len(res) == 1 - - -def test_label_cidr_and_ipinfo( - iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now -): - # We have network_iplabel.ip as a cidr col and - # thl_ipinformation.ip as a inet col. Make sure we can join appropriately - ip = fake.ipv6() - ip_information_factory(ip=ip, geoname=ip_geoname) - # We normalize for storage into ipinfo table - ip_norm, prefix = normalize_ip(ip) - - # Test with a larger network - ip_48 = ipaddress.IPv6Network((ip, 48), strict=False) - print(f"{ip=}") - print(f"{ip_norm=}") - print(f"{ip_48=}") - label = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip_48, - ) - iplabel_manager.create(label) - - res = iplabel_manager.test_join(ip_norm) - print(res) diff --git a/tests/managers/network/test_label.py b/tests/managers/network/test_label.py new file mode 100644 index 0000000..5b9a790 --- /dev/null +++ b/tests/managers/network/test_label.py @@ -0,0 +1,202 @@ +import ipaddress + +import faker +import pytest +from psycopg.errors import UniqueViolation +from pydantic import ValidationError + +from generalresearch.managers.network.label import IPLabelManager +from generalresearch.models.network.label import ( + IPLabel, + IPLabelKind, + IPLabelSource, + IPLabelMetadata, +) +from generalresearch.models.thl.ipinfo import normalize_ip + +fake = faker.Faker() + + +@pytest.fixture +def ip_label(utc_now) -> IPLabel: + ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + return IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + metadata=IPLabelMetadata(services=["RDP"]) + ) + + +def test_model(utc_now): + ip = fake.ipv4_public() + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + assert lbl.ip.prefixlen == 32 + print(f"{lbl.ip=}") + + ip = ipaddress.IPv4Network((ip, 24), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"): + IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=fake.ipv6(), + ) + + ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + +def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel): + iplabel_manager.create(ip_label) + + with pytest.raises( + UniqueViolation, match="duplicate key value violates unique constraint" + ): + iplabel_manager.create(ip_label) + + +def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago): + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 0 + + iplabel_manager.create(ip_label) + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 1 + + out = res[0] + assert out == ip_label + + res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) + assert len(res) == 1 + + ip_label2 = ip_label.model_copy() + ip_label2.ip = fake.ipv4_public() + iplabel_manager.create(ip_label2) + res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) + assert len(res) == 2 + + +def test_filter_network( + iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago +): + print(ip_label) + ip_label = ip_label.model_copy() + ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + + iplabel_manager.create(ip_label) + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 1 + + out = res[0] + assert out == ip_label + + res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) + assert len(res) == 1 + + ip_label2 = ip_label.model_copy() + ip_label2.ip = fake.ipv4_public() + iplabel_manager.create(ip_label2) + res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) + assert len(res) == 2 + + +def test_network(iplabel_manager: IPLabelManager, utc_now): + # This is a fully-specific /128 ipv6 address. + # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d' + ip = fake.ipv6() + # Generally, we'd want to annotate the /64 network + # e.g. '51b7:b38d:8717:6c5b::/64' + ip_64 = ipaddress.IPv6Network((ip, 64), strict=False) + + label = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip_64, + ) + iplabel_manager.create(label) + + # If I query for the /128 directly, I won't find it + res = iplabel_manager.filter(ips=[ip]) + assert len(res) == 0 + + # If I query for the /64 network I will + res = iplabel_manager.filter(ips=[ip_64]) + assert len(res) == 1 + + # Or, I can query for the /128 ip IN a network + res = iplabel_manager.filter(ip_in_network=ip) + assert len(res) == 1 + + +def test_label_cidr_and_ipinfo( + iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now +): + # We have network_iplabel.ip as a cidr col and + # thl_ipinformation.ip as a inet col. Make sure we can join appropriately + ip = fake.ipv6() + ip_information_factory(ip=ip, geoname=ip_geoname) + # We normalize for storage into ipinfo table + ip_norm, prefix = normalize_ip(ip) + + # Test with a larger network + ip_48 = ipaddress.IPv6Network((ip, 48), strict=False) + print(f"{ip=}") + print(f"{ip_norm=}") + print(f"{ip_48=}") + label = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip_48, + ) + iplabel_manager.create(label) + + res = iplabel_manager.test_join(ip_norm) + print(res) diff --git a/tests/managers/network/test_tool_run.py b/tests/managers/network/test_tool_run.py new file mode 100644 index 0000000..a815809 --- /dev/null +++ b/tests/managers/network/test_tool_run.py @@ -0,0 +1,25 @@ +def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): + + toolrun_manager.create_nmap_run(nmap_run) + + run_out = toolrun_manager.get_nmap_run(nmap_run.id) + + assert nmap_run == run_out + + +def test_create_tool_run_from_rdns_run(rdns_run, toolrun_manager): + + toolrun_manager.create_rdns_run(rdns_run) + + run_out = toolrun_manager.get_rdns_run(rdns_run.id) + + assert rdns_run == run_out + + +def test_create_tool_run_from_mtr_run(mtr_run, toolrun_manager): + + toolrun_manager.create_mtr_run(mtr_run) + + run_out = toolrun_manager.get_mtr_run(mtr_run.id) + + assert mtr_run == run_out diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py deleted file mode 100644 index a815809..0000000 --- a/tests/managers/network/tool_run.py +++ /dev/null @@ -1,25 +0,0 @@ -def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): - - toolrun_manager.create_nmap_run(nmap_run) - - run_out = toolrun_manager.get_nmap_run(nmap_run.id) - - assert nmap_run == run_out - - -def test_create_tool_run_from_rdns_run(rdns_run, toolrun_manager): - - toolrun_manager.create_rdns_run(rdns_run) - - run_out = toolrun_manager.get_rdns_run(rdns_run.id) - - assert rdns_run == run_out - - -def test_create_tool_run_from_mtr_run(mtr_run, toolrun_manager): - - toolrun_manager.create_mtr_run(mtr_run) - - run_out = toolrun_manager.get_mtr_run(mtr_run.id) - - assert mtr_run == run_out diff --git a/tests/models/network/mtr.py b/tests/models/network/mtr.py deleted file mode 100644 index 2965300..0000000 --- a/tests/models/network/mtr.py +++ /dev/null @@ -1,26 +0,0 @@ -from generalresearch.models.network.mtr.execute import execute_mtr -import faker - -from generalresearch.models.network.tool_run import ToolName, ToolClass - -fake = faker.Faker() - - -def test_execute_mtr(toolrun_manager): - ip = "65.19.129.53" - - run = execute_mtr(ip=ip, report_cycles=3) - assert run.tool_name == ToolName.MTR - assert run.tool_class == ToolClass.TRACEROUTE - assert run.ip == ip - result = run.parsed - - last_hop = result.hops[-1] - assert last_hop.asn == 6939 - assert last_hop.domain == "grlengine.com" - - last_hop_1 = result.hops[-2] - assert last_hop_1.asn == 6939 - assert last_hop_1.domain == "he.net" - - toolrun_manager.create_mtr_run(run) diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py deleted file mode 100644 index f034bf0..0000000 --- a/tests/models/network/nmap.py +++ /dev/null @@ -1,29 +0,0 @@ -import subprocess - -from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.nmap.execute import execute_nmap -import faker - -from generalresearch.models.network.nmap.result import PortState -from generalresearch.models.network.tool_run import ToolName, ToolClass - -fake = faker.Faker() - - -def resolve(host): - return subprocess.check_output(["dig", host, "+short"]).decode().strip() - - -def test_execute_nmap_scanme(toolrun_manager): - ip = resolve("scanme.nmap.org") - - run = execute_nmap(ip=ip, top_ports=20) - assert run.tool_name == ToolName.NMAP - assert run.tool_class == ToolClass.PORT_SCAN - assert run.ip == ip - result = run.parsed - - port22 = result._port_index[(IPProtocol.TCP, 22)] - assert port22.state == PortState.OPEN - - toolrun_manager.create_nmap_run(run) diff --git a/tests/models/network/nmap_parser.py b/tests/models/network/nmap_parser.py deleted file mode 100644 index 96d7b37..0000000 --- a/tests/models/network/nmap_parser.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -import pytest - -from generalresearch.models.network.nmap.parser import parse_nmap_xml - -@pytest.fixture -def nmap_raw_output_2(request) -> str: - fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml") - with open(fp, "r") as f: - data = f.read() - return data - - -def test_nmap_xml_parser(nmap_raw_output, nmap_raw_output_2): - n = parse_nmap_xml(nmap_raw_output) - assert n.tcp_open_ports == [61232] - assert len(n.trace.hops) == 18 - - n = parse_nmap_xml(nmap_raw_output_2) - assert n.tcp_open_ports == [22, 80, 9929, 31337] - assert n.trace is None diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py deleted file mode 100644 index e56c494..0000000 --- a/tests/models/network/rdns.py +++ /dev/null @@ -1,33 +0,0 @@ -from generalresearch.models.network.rdns.execute import execute_rdns -import faker - -from generalresearch.models.network.tool_run import ToolName, ToolClass - -fake = faker.Faker() - - -def test_execute_rdns_grl(toolrun_manager): - ip = "65.19.129.53" - run = execute_rdns(ip=ip) - assert run.tool_name == ToolName.DIG - assert run.tool_class == ToolClass.RDNS - assert run.ip == ip - result = run.parsed - assert result.primary_hostname == "in1-smtp.grlengine.com" - assert result.primary_domain == "grlengine.com" - assert result.hostname_count == 1 - - toolrun_manager.create_rdns_run(run) - - -def test_execute_rdns_none(toolrun_manager): - ip = fake.ipv6() - run = execute_rdns(ip) - result = run.parsed - - assert result.primary_hostname is None - assert result.primary_domain is None - assert result.hostname_count == 0 - assert result.hostnames == [] - - toolrun_manager.create_rdns_run(run) diff --git a/tests/models/network/test_mtr.py b/tests/models/network/test_mtr.py new file mode 100644 index 0000000..2965300 --- /dev/null +++ b/tests/models/network/test_mtr.py @@ -0,0 +1,26 @@ +from generalresearch.models.network.mtr.execute import execute_mtr +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_mtr(toolrun_manager): + ip = "65.19.129.53" + + run = execute_mtr(ip=ip, report_cycles=3) + assert run.tool_name == ToolName.MTR + assert run.tool_class == ToolClass.TRACEROUTE + assert run.ip == ip + result = run.parsed + + last_hop = result.hops[-1] + assert last_hop.asn == 6939 + assert last_hop.domain == "grlengine.com" + + last_hop_1 = result.hops[-2] + assert last_hop_1.asn == 6939 + assert last_hop_1.domain == "he.net" + + toolrun_manager.create_mtr_run(run) diff --git a/tests/models/network/test_nmap.py b/tests/models/network/test_nmap.py new file mode 100644 index 0000000..0be98d4 --- /dev/null +++ b/tests/models/network/test_nmap.py @@ -0,0 +1,29 @@ +import subprocess + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.nmap.execute import execute_nmap +import faker + +from generalresearch.models.network.nmap.result import PortState +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def resolve(host): + return subprocess.check_output(["dig", host, "+short"]).decode().strip() + + +def test_execute_nmap_scanme(toolrun_manager): + ip = resolve("scanme.nmap.org") + + run = execute_nmap(ip=ip, top_ports=None, ports="20-30", enable_advanced=False) + assert run.tool_name == ToolName.NMAP + assert run.tool_class == ToolClass.PORT_SCAN + assert run.ip == ip + result = run.parsed + + port22 = result._port_index[(IPProtocol.TCP, 22)] + assert port22.state == PortState.OPEN + + toolrun_manager.create_nmap_run(run) diff --git a/tests/models/network/test_nmap_parser.py b/tests/models/network/test_nmap_parser.py new file mode 100644 index 0000000..96d7b37 --- /dev/null +++ b/tests/models/network/test_nmap_parser.py @@ -0,0 +1,22 @@ +import os + +import pytest + +from generalresearch.models.network.nmap.parser import parse_nmap_xml + +@pytest.fixture +def nmap_raw_output_2(request) -> str: + fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml") + with open(fp, "r") as f: + data = f.read() + return data + + +def test_nmap_xml_parser(nmap_raw_output, nmap_raw_output_2): + n = parse_nmap_xml(nmap_raw_output) + assert n.tcp_open_ports == [61232] + assert len(n.trace.hops) == 18 + + n = parse_nmap_xml(nmap_raw_output_2) + assert n.tcp_open_ports == [22, 80, 9929, 31337] + assert n.trace is None diff --git a/tests/models/network/test_rdns.py b/tests/models/network/test_rdns.py new file mode 100644 index 0000000..e56c494 --- /dev/null +++ b/tests/models/network/test_rdns.py @@ -0,0 +1,33 @@ +from generalresearch.models.network.rdns.execute import execute_rdns +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_rdns_grl(toolrun_manager): + ip = "65.19.129.53" + run = execute_rdns(ip=ip) + assert run.tool_name == ToolName.DIG + assert run.tool_class == ToolClass.RDNS + assert run.ip == ip + result = run.parsed + assert result.primary_hostname == "in1-smtp.grlengine.com" + assert result.primary_domain == "grlengine.com" + assert result.hostname_count == 1 + + toolrun_manager.create_rdns_run(run) + + +def test_execute_rdns_none(toolrun_manager): + ip = fake.ipv6() + run = execute_rdns(ip) + result = run.parsed + + assert result.primary_hostname is None + assert result.primary_domain is None + assert result.hostname_count == 0 + assert result.hostnames == [] + + toolrun_manager.create_rdns_run(run) -- cgit v1.2.3