From a68a9eb9873c7502c2b7bddb55c4eb61689a48a2 Mon Sep 17 00:00:00 2001
From: stuppie
Date: Mon, 9 Mar 2026 18:42:22 -0600
Subject: add IPLabel, NmapRun, RDNSResult, ToolRun, model/managers/tests. nmap
xml parser. + test. work in progress
---
generalresearch/managers/network/__init__.py | 0
generalresearch/managers/network/label.py | 144 +++++++++
generalresearch/managers/network/nmap.py | 59 ++++
generalresearch/managers/network/rdns.py | 26 ++
generalresearch/managers/network/tool_run.py | 90 ++++++
generalresearch/models/custom_types.py | 14 +-
generalresearch/models/network/__init__.py | 0
generalresearch/models/network/label.py | 126 ++++++++
generalresearch/models/network/nmap.py | 451 +++++++++++++++++++++++++++
generalresearch/models/network/rdns.py | 101 ++++++
generalresearch/models/network/tool_run.py | 131 ++++++++
generalresearch/models/network/tool_utils.py | 69 ++++
generalresearch/models/network/xml_parser.py | 405 ++++++++++++++++++++++++
pyproject.toml | 1 +
requirements.txt | 3 +
test_utils/managers/network/__init__.py | 0
test_utils/managers/network/conftest.py | 86 +++++
test_utils/models/conftest.py | 6 +-
tests/conftest.py | 1 +
tests/data/nmaprun1.xml | 68 ++++
tests/data/nmaprun2.xml | 118 +++++++
tests/managers/network/__init__.py | 0
tests/managers/network/label.py | 202 ++++++++++++
tests/managers/network/tool_run.py | 48 +++
tests/models/network/__init__.py | 0
tests/models/network/nmap.py | 32 ++
tests/models/network/rdns.py | 23 ++
tests/models/network/tool_run.py | 8 +
28 files changed, 2207 insertions(+), 5 deletions(-)
create mode 100644 generalresearch/managers/network/__init__.py
create mode 100644 generalresearch/managers/network/label.py
create mode 100644 generalresearch/managers/network/nmap.py
create mode 100644 generalresearch/managers/network/rdns.py
create mode 100644 generalresearch/managers/network/tool_run.py
create mode 100644 generalresearch/models/network/__init__.py
create mode 100644 generalresearch/models/network/label.py
create mode 100644 generalresearch/models/network/nmap.py
create mode 100644 generalresearch/models/network/rdns.py
create mode 100644 generalresearch/models/network/tool_run.py
create mode 100644 generalresearch/models/network/tool_utils.py
create mode 100644 generalresearch/models/network/xml_parser.py
create mode 100644 test_utils/managers/network/__init__.py
create mode 100644 test_utils/managers/network/conftest.py
create mode 100644 tests/data/nmaprun1.xml
create mode 100644 tests/data/nmaprun2.xml
create mode 100644 tests/managers/network/__init__.py
create mode 100644 tests/managers/network/label.py
create mode 100644 tests/managers/network/tool_run.py
create mode 100644 tests/models/network/__init__.py
create mode 100644 tests/models/network/nmap.py
create mode 100644 tests/models/network/rdns.py
create mode 100644 tests/models/network/tool_run.py
diff --git a/generalresearch/managers/network/__init__.py b/generalresearch/managers/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/generalresearch/managers/network/label.py b/generalresearch/managers/network/label.py
new file mode 100644
index 0000000..0405716
--- /dev/null
+++ b/generalresearch/managers/network/label.py
@@ -0,0 +1,144 @@
+from datetime import datetime, timezone, timedelta
+from typing import Collection, Optional, List
+
+from pydantic import TypeAdapter, IPvAnyNetwork
+
+from generalresearch.managers.base import PostgresManager
+from generalresearch.models.custom_types import (
+ AwareDatetimeISO,
+ IPvAnyAddressStr,
+ IPvAnyNetworkStr,
+)
+from generalresearch.models.network.label import IPLabel, IPLabelKind, IPLabelSource
+
+
+class IPLabelManager(PostgresManager):
+ def create(self, ip_label: IPLabel) -> IPLabel:
+ query = """
+ INSERT INTO network_iplabel (
+ ip, labeled_at, created_at,
+ label_kind, source, confidence,
+ provider, metadata
+ ) VALUES (
+ %(ip)s, %(labeled_at)s, %(created_at)s,
+ %(label_kind)s, %(source)s, %(confidence)s,
+ %(provider)s, %(metadata)s
+ ) RETURNING id;"""
+ params = ip_label.model_dump_postgres()
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, params)
+ pk = c.fetchone()["id"]
+ return ip_label
+
+ def make_filter_str(
+ self,
+ ips: Optional[Collection[IPvAnyNetworkStr]] = None,
+ ip_in_network: Optional[IPvAnyAddressStr] = None,
+ label_kind: Optional[IPLabelKind] = None,
+ source: Optional[IPLabelSource] = None,
+ labeled_at: Optional[AwareDatetimeISO] = None,
+ labeled_after: Optional[AwareDatetimeISO] = None,
+ labeled_before: Optional[AwareDatetimeISO] = None,
+ provider: Optional[str] = None,
+ ):
+ filters = []
+ params = {}
+ if labeled_after or labeled_before:
+ time_end = labeled_before or datetime.now(tz=timezone.utc)
+ time_start = labeled_after or datetime(2017, 1, 1, tzinfo=timezone.utc)
+ assert time_start.tzinfo.utcoffset(time_start) == timedelta(), "must be UTC"
+ assert time_end.tzinfo.utcoffset(time_end) == timedelta(), "must be UTC"
+ filters.append("labeled_at BETWEEN %(time_start)s AND %(time_end)s")
+ params["time_start"] = time_start
+ params["time_end"] = time_end
+ if labeled_at:
+ assert labeled_at.tzinfo.utcoffset(labeled_at) == timedelta(), "must be UTC"
+ filters.append("labeled_at == %(labeled_at)s")
+ params["labeled_at"] = labeled_at
+ if label_kind:
+ filters.append("label_kind = %(label_kind)s")
+ params["label_kind"] = label_kind.value
+ if source:
+ filters.append("source = %(source)s")
+ params["source"] = source.value
+ if provider:
+ filters.append("provider = %(provider)s")
+ params["provider"] = provider
+ if ips is not None:
+ filters.append("ip = ANY(%(ips)s)")
+ params["ips"] = list(ips)
+ if ip_in_network:
+ """
+ Return matching networks.
+ e.g. ip = '13f9:c462:e039:a38c::1', might return rows
+ where ip = '13f9:c462:e039::/48' or '13f9:c462:e039:a38c::/64'
+ """
+ filters.append("ip >>= %(ip_in_network)s")
+ params["ip_in_network"] = ip_in_network
+
+ filter_str = "WHERE " + " AND ".join(filters) if filters else ""
+ return filter_str, params
+
+ def filter(
+ self,
+ ips: Optional[Collection[IPvAnyNetworkStr]] = None,
+ ip_in_network: Optional[IPvAnyAddressStr] = None,
+ label_kind: Optional[IPLabelKind] = None,
+ source: Optional[IPLabelSource] = None,
+ labeled_at: Optional[AwareDatetimeISO] = None,
+ labeled_after: Optional[AwareDatetimeISO] = None,
+ labeled_before: Optional[AwareDatetimeISO] = None,
+ provider: Optional[str] = None,
+ ) -> List[IPLabel]:
+ filter_str, params = self.make_filter_str(
+ ips=ips,
+ ip_in_network=ip_in_network,
+ label_kind=label_kind,
+ source=source,
+ labeled_at=labeled_at,
+ labeled_after=labeled_after,
+ labeled_before=labeled_before,
+ provider=provider,
+ )
+ query = f"""
+ SELECT
+ ip, labeled_at, created_at,
+ label_kind, source, confidence,
+ provider, metadata
+ FROM network_iplabel
+ {filter_str}
+ """
+ res = self.pg_config.execute_sql_query(query, params)
+ return [IPLabel.model_validate(rec) for rec in res]
+
+ def get_most_specific_matching_network(self, ip: IPvAnyAddressStr) -> IPvAnyNetwork:
+ """
+ e.g. ip = 'b5f4:dc2:f136:70d5:5b6e:9a85:c7d4:3517', might return
+ 'b5f4:dc2:f136:70d5::/64'
+ """
+ ip = TypeAdapter(IPvAnyAddressStr).validate_python(ip)
+
+ query = """
+ SELECT ip
+ FROM network_iplabel
+ WHERE ip >>= %(ip)s
+ ORDER BY masklen(ip) DESC
+ LIMIT 1;"""
+ res = self.pg_config.execute_sql_query(query, {"ip": ip})
+ if res:
+ return IPvAnyNetwork(res[0]["ip"])
+
+ def test_join(self, ip):
+ query = """
+ SELECT
+ to_jsonb(i) AS ipinfo,
+ to_jsonb(l) AS iplabel
+ FROM thl_ipinformation i
+ LEFT JOIN network_iplabel l
+ ON l.ip >>= i.ip
+ WHERE i.ip = %(ip)s
+ ORDER BY masklen(l.ip) DESC;"""
+ params = {"ip": ip}
+ res = self.pg_config.execute_sql_query(query, params)
+ return res
diff --git a/generalresearch/managers/network/nmap.py b/generalresearch/managers/network/nmap.py
new file mode 100644
index 0000000..9cbc283
--- /dev/null
+++ b/generalresearch/managers/network/nmap.py
@@ -0,0 +1,59 @@
+from typing import Optional
+
+from psycopg import Cursor
+
+from generalresearch.managers.base import PostgresManager
+from generalresearch.models.network.tool_run import PortScanRun
+
+
+class NmapManager(PostgresManager):
+
+ def _create(self, run: PortScanRun, c: Optional[Cursor] = None) -> None:
+ """
+ Insert a PortScan + PortScanPorts from a Pydantic NmapRun.
+ Do not use this directly. Must only be used in the context of a toolrun
+ """
+ query = """
+ INSERT INTO network_portscan (
+ run_id, xml_version, host_state,
+ host_state_reason, latency_ms, distance,
+ uptime_seconds, last_boot, raw_xml,
+ parsed, scan_group_id, open_tcp_ports,
+ started_at, ip
+ )
+ VALUES (
+ %(run_id)s, %(xml_version)s, %(host_state)s,
+ %(host_state_reason)s, %(latency_ms)s, %(distance)s,
+ %(uptime_seconds)s, %(last_boot)s, %(raw_xml)s,
+ %(parsed)s, %(scan_group_id)s, %(open_tcp_ports)s,
+ %(started_at)s, %(ip)s
+ );
+ """
+ params = run.model_dump_postgres()
+
+ query_ports = """
+ INSERT INTO network_portscanport (
+ port_scan_id, protocol, port,
+ state, reason, reason_ttl,
+ service_name
+ ) VALUES (
+ %(port_scan_id)s, %(protocol)s, %(port)s,
+ %(state)s, %(reason)s, %(reason_ttl)s,
+ %(service_name)s
+ )
+ """
+ nmap_run = run.parsed
+ params_ports = [p.model_dump_postgres(run_id=run.id) for p in nmap_run.ports]
+
+ if c:
+ c.execute(query, params)
+ if nmap_run.ports:
+ c.executemany(query_ports, params_ports)
+ else:
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ c.execute(query, params)
+ if nmap_run.ports:
+ c.executemany(query_ports, params_ports)
+
+ return None
diff --git a/generalresearch/managers/network/rdns.py b/generalresearch/managers/network/rdns.py
new file mode 100644
index 0000000..2eed303
--- /dev/null
+++ b/generalresearch/managers/network/rdns.py
@@ -0,0 +1,26 @@
+from typing import Optional
+
+from psycopg import Cursor
+
+from generalresearch.managers.base import PostgresManager
+from generalresearch.models.network.tool_run import RDnsRun
+
+
+class RdnsManager(PostgresManager):
+
+ def _create(self, run: RDnsRun, c: Optional[Cursor] = None) -> None:
+ """
+ Do not use this directly. Must only be used in the context of a toolrun
+ """
+ query = """
+ INSERT INTO network_rdnsresult (
+ run_id, primary_hostname, primary_org,
+ hostname_count, hostnames
+ )
+ VALUES (
+ %(run_id)s, %(primary_hostname)s, %(primary_org)s,
+ %(hostname_count)s, %(hostnames)s
+ );
+ """
+ params = run.model_dump_postgres()
+ c.execute(query, params)
\ No newline at end of file
diff --git a/generalresearch/managers/network/tool_run.py b/generalresearch/managers/network/tool_run.py
new file mode 100644
index 0000000..75c2e73
--- /dev/null
+++ b/generalresearch/managers/network/tool_run.py
@@ -0,0 +1,90 @@
+from typing import Collection
+
+from psycopg import Cursor, sql
+
+from generalresearch.managers.base import PostgresManager, Permission
+from generalresearch.models.network.rdns import RDNSResult
+from generalresearch.models.network.tool_run import ToolRun, PortScanRun, RDnsRun
+from generalresearch.managers.network.nmap import NmapManager
+from generalresearch.managers.network.rdns import RdnsManager
+from generalresearch.pg_helper import PostgresConfig
+
+
+class ToolRunManager(PostgresManager):
+ def __init__(
+ self,
+ pg_config: PostgresConfig,
+ permissions: Collection[Permission] = None,
+ ):
+ super().__init__(pg_config=pg_config, permissions=permissions)
+ self.nmap_manager = NmapManager(self.pg_config)
+ self.rdns_manager = RdnsManager(self.pg_config)
+
+ def create_tool_run(self, run: PortScanRun | RDnsRun, c: Cursor):
+ query = sql.SQL(
+ """
+ INSERT INTO network_toolrun (
+ ip, scan_group_id, tool_class,
+ tool_name, tool_version, started_at,
+ finished_at, status, raw_command,
+ config
+ )
+ VALUES (
+ %(ip)s, %(scan_group_id)s, %(tool_class)s,
+ %(tool_name)s, %(tool_version)s, %(started_at)s,
+ %(finished_at)s, %(status)s, %(raw_command)s,
+ %(config)s
+ ) RETURNING id;
+ """
+ )
+ params = run.model_dump_postgres()
+ c.execute(query, params)
+ run_id = c.fetchone()["id"]
+ run.id = run_id
+ return None
+
+ def create_portscan_run(self, run: PortScanRun) -> PortScanRun:
+ """
+ Insert a PortScan + PortScanPorts from a Pydantic NmapRun.
+ """
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ self.create_tool_run(run, c)
+ self.nmap_manager._create(run, c=c)
+ return run
+
+ def get_portscan_run(self, id: int) -> PortScanRun:
+ query = """
+ SELECT tr.*, np.parsed
+ FROM network_toolrun tr
+ JOIN network_portscan np ON tr.id = np.run_id
+ WHERE id = %(id)s
+ """
+ params = {"id": id}
+ res = self.pg_config.execute_sql_query(query, params)[0]
+ return PortScanRun.model_validate(res)
+
+ def create_rdns_run(self, run: RDnsRun) -> RDnsRun:
+ """
+ Insert a RDnsRun + RDNSResult
+ """
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ self.create_tool_run(run, c)
+ self.rdns_manager._create(run, c=c)
+ return run
+
+ def get_rdns_run(self, id: int) -> RDnsRun:
+ query = """
+ SELECT tr.*, hostnames
+ FROM network_toolrun tr
+ JOIN network_rdnsresult np ON tr.id = np.run_id
+ WHERE id = %(id)s
+ """
+ params = {"id": id}
+ res = self.pg_config.execute_sql_query(query, params)[0]
+ parsed = RDNSResult.model_validate(
+ {"ip": res["ip"], "hostnames": res["hostnames"]}
+ )
+ res["parsed"] = parsed
+ return RDnsRun.model_validate(res)
diff --git a/generalresearch/models/custom_types.py b/generalresearch/models/custom_types.py
index aefbbe9..ea96741 100644
--- a/generalresearch/models/custom_types.py
+++ b/generalresearch/models/custom_types.py
@@ -14,7 +14,7 @@ from pydantic import (
)
from pydantic.functional_serializers import PlainSerializer
from pydantic.functional_validators import AfterValidator, BeforeValidator
-from pydantic.networks import UrlConstraints
+from pydantic.networks import UrlConstraints, IPvAnyNetwork
from pydantic_core import Url
from typing_extensions import Annotated
@@ -121,13 +121,19 @@ HttpsUrlStr = Annotated[
BeforeValidator(lambda value: str(TypeAdapter(HttpsUrl).validate_python(value))),
]
-# Same thing as UUIDStr with IPvAnyAddress field. It is confusing that this is not a str
+# Same thing as UUIDStr with IPvAnyAddress field
IPvAnyAddressStr = Annotated[
str,
BeforeValidator(
lambda value: str(TypeAdapter(IPvAnyAddress).validate_python(value).exploded)
),
]
+IPvAnyNetworkStr = Annotated[
+ str,
+ BeforeValidator(
+ lambda value: str(TypeAdapter(IPvAnyNetwork).validate_python(value))
+ ),
+]
def coerce_int_to_str(data: Any) -> Any:
@@ -279,3 +285,7 @@ PropertyCode = Annotated[
pattern=r"^[a-z]{1,2}\:.*",
),
]
+
+
+def now_utc_factory():
+ return datetime.now(tz=timezone.utc)
diff --git a/generalresearch/models/network/__init__.py b/generalresearch/models/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/generalresearch/models/network/label.py b/generalresearch/models/network/label.py
new file mode 100644
index 0000000..b9a7659
--- /dev/null
+++ b/generalresearch/models/network/label.py
@@ -0,0 +1,126 @@
+from __future__ import annotations
+
+import ipaddress
+from enum import StrEnum
+from typing import Optional, List
+
+from pydantic import (
+ BaseModel,
+ Field,
+ computed_field,
+ field_validator,
+ ConfigDict,
+ IPvAnyNetwork,
+)
+
+from generalresearch.models.custom_types import (
+ AwareDatetimeISO,
+ now_utc_factory,
+)
+
+
+class IPTrustClass(StrEnum):
+ TRUSTED = "trusted"
+ UNTRUSTED = "untrusted"
+ # Note: use case of unknown is for e.g. Spur says this IP is a residential proxy
+ # on 2026-1-1, and then has no annotation a month later. It doesn't mean
+ # the IP is TRUSTED, but we want to record that Spur now doesn't claim UNTRUSTED.
+ UNKNOWN = "unknown"
+
+
+class IPLabelKind(StrEnum):
+ # --- UNTRUSTED ---
+ RESIDENTIAL_PROXY = "residential_proxy"
+ DATACENTER_PROXY = "datacenter_proxy"
+ ISP_PROXY = "isp_proxy"
+ MOBILE_PROXY = "mobile_proxy"
+ PROXY = "proxy"
+ HOSTING = "hosting"
+ VPN = "vpn"
+ RELAY = "relay"
+ TOR_EXIT = "tor_exit"
+ BAD_ACTOR = "bad_actor"
+ # --- TRUSTED ---
+ TRUSTED_USER = "trusted_user"
+ # --- UNKNOWN ---
+ UNKNOWN = "unknown"
+
+
+class IPLabelSource(StrEnum):
+ # We got this IP from our own use of a proxy service
+ INTERNAL_USE = "internal_use"
+
+ # An external "security" service flagged this IP
+ SPUR = "spur"
+ IPINFO = "ipinfo"
+ MAXMIND = "maxmind"
+
+ MANUAL = "manual"
+
+
+class IPLabel(BaseModel):
+ """
+ Stores *ground truth* about an IP at a specific time.
+ To be used for model training and evaluation.
+ """
+
+ model_config = ConfigDict(validate_assignment=True)
+
+ ip: IPvAnyNetwork = Field()
+
+ labeled_at: AwareDatetimeISO = Field(default_factory=now_utc_factory)
+ created_at: Optional[AwareDatetimeISO] = Field(default=None)
+
+ label_kind: IPLabelKind = Field()
+ source: IPLabelSource = Field()
+
+ confidence: float = Field(default=1.0, ge=0.0, le=1.0)
+
+ # Optionally, if this is untrusted, which service is providing the proxy/vpn service
+ provider: Optional[str] = Field(
+ default=None, examples=["geonode", "gecko"], max_length=128
+ )
+
+ metadata: Optional[IPLabelMetadata] = Field(default=None)
+
+ @field_validator("ip", mode="before")
+ @classmethod
+ def normalize_and_validate_network(cls, v):
+ net = ipaddress.ip_network(v, strict=False)
+
+ if isinstance(net, ipaddress.IPv6Network):
+ if net.prefixlen > 64:
+ raise ValueError("IPv6 network must be /64 or larger")
+
+ return net
+
+ @field_validator("provider", mode="before")
+ @classmethod
+ def provider_format(cls, v: Optional[str]) -> Optional[str]:
+ if v is None:
+ return v
+ return v.lower().strip()
+
+ @computed_field()
+ @property
+ def trust_class(self) -> IPTrustClass:
+ if self.label_kind == IPLabelKind.UNKNOWN:
+ return IPTrustClass.UNKNOWN
+ if self.label_kind == IPLabelKind.TRUSTED_USER:
+ return IPTrustClass.TRUSTED
+ return IPTrustClass.UNTRUSTED
+
+ def model_dump_postgres(self):
+ d = self.model_dump(mode="json")
+ d["metadata"] = self.metadata.model_dump_json() if self.metadata else None
+ return d
+
+
+class IPLabelMetadata(BaseModel):
+ """
+ To be expanded. Just for storing some things from Spur for now
+ """
+
+ model_config = ConfigDict(validate_assignment=True, extra="allow")
+
+ services: Optional[List[str]] = Field(min_length=1, examples=[["RDP"]])
diff --git a/generalresearch/models/network/nmap.py b/generalresearch/models/network/nmap.py
new file mode 100644
index 0000000..8f1720b
--- /dev/null
+++ b/generalresearch/models/network/nmap.py
@@ -0,0 +1,451 @@
+import json
+from datetime import timedelta
+from enum import StrEnum
+from functools import cached_property
+from typing import Dict, Any, Literal, List, Optional, Tuple, Set
+
+from pydantic import computed_field, BaseModel, Field
+
+from generalresearch.models.custom_types import AwareDatetimeISO, IPvAnyAddressStr
+
+
+class NmapTraceProtocol(StrEnum):
+ TCP = "tcp"
+ UDP = "udp"
+ SCTP = "sctp"
+
+
+class PortState(StrEnum):
+ OPEN = "open"
+ CLOSED = "closed"
+ FILTERED = "filtered"
+ UNFILTERED = "unfiltered"
+ OPEN_FILTERED = "open|filtered"
+ CLOSED_FILTERED = "closed|filtered"
+ # Added by me, does not get returned. Used for book-keeping
+ NOT_SCANNED = "not_scanned"
+
+
+class PortStateReason(StrEnum):
+ SYN_ACK = "syn-ack"
+ RESET = "reset"
+ CONN_REFUSED = "conn-refused"
+ NO_RESPONSE = "no-response"
+ SYN = "syn"
+ FIN = "fin"
+
+ ICMP_NET_UNREACH = "net-unreach"
+ ICMP_HOST_UNREACH = "host-unreach"
+ ICMP_PROTO_UNREACH = "proto-unreach"
+ ICMP_PORT_UNREACH = "port-unreach"
+
+ ADMIN_PROHIBITED = "admin-prohibited"
+ HOST_PROHIBITED = "host-prohibited"
+ NET_PROHIBITED = "net-prohibited"
+
+ ECHO_REPLY = "echo-reply"
+ TIME_EXCEEDED = "time-exceeded"
+
+
+class NmapProtocol(StrEnum):
+ TCP = "tcp"
+ UDP = "udp"
+ SCTP = "sctp"
+ IP = "ip"
+
+ def to_number(self) -> int:
+ return {
+ self.TCP: 6,
+ self.UDP: 17,
+ self.SCTP: 132,
+ self.IP: 4,
+ }[self]
+
+
+class NmapScanType(StrEnum):
+ SYN = "syn"
+ CONNECT = "connect"
+ ACK = "ack"
+ WINDOW = "window"
+ MAIMON = "maimon"
+ FIN = "fin"
+ NULL = "null"
+ XMAS = "xmas"
+ UDP = "udp"
+ SCTP_INIT = "sctpinit"
+ SCTP_COOKIE_ECHO = "sctpcookieecho"
+
+
+class NmapHostState(StrEnum):
+ UP = "up"
+ DOWN = "down"
+ UNKNOWN = "unknown"
+
+
+class NmapHostStatusReason(StrEnum):
+ USER_SET = "user-set"
+ SYN_ACK = "syn-ack"
+ RESET = "reset"
+ ECHO_REPLY = "echo-reply"
+ ARP_RESPONSE = "arp-response"
+ NO_RESPONSE = "no-response"
+ NET_UNREACH = "net-unreach"
+ HOST_UNREACH = "host-unreach"
+ PROTO_UNREACH = "proto-unreach"
+ PORT_UNREACH = "port-unreach"
+ ADMIN_PROHIBITED = "admin-prohibited"
+ LOCALHOST_RESPONSE = "localhost-response"
+
+
+class NmapOSClass(BaseModel):
+ vendor: str = None
+ osfamily: str = None
+ osgen: Optional[str] = None
+ accuracy: int = None
+ cpe: Optional[List[str]] = None
+
+
+class NmapOSMatch(BaseModel):
+ name: str
+ accuracy: int
+ classes: List[NmapOSClass] = Field(default_factory=list)
+
+ @property
+ def best_class(self) -> Optional[NmapOSClass]:
+ if not self.classes:
+ return None
+ return max(self.classes, key=lambda m: m.accuracy)
+
+
+class NmapScript(BaseModel):
+ """
+
+ """
+
+ id: str
+ output: Optional[str] = None
+ elements: Dict[str, Any] = Field(default_factory=dict)
+
+
+class NmapService(BaseModel):
+ #
+ name: Optional[str] = None
+ product: Optional[str] = None
+ version: Optional[str] = None
+ extrainfo: Optional[str] = None
+ method: Optional[str] = None
+ conf: Optional[int] = None
+ cpe: List[str] = Field(default_factory=list)
+
+ def model_dump_postgres(self):
+ d = self.model_dump(mode="json")
+ d["service_name"] = self.name
+ return d
+
+
+class NmapPort(BaseModel):
+ port: int = Field()
+ protocol: NmapProtocol = Field()
+ # Closed ports will not have a NmapPort record
+ state: PortState = Field()
+ reason: Optional[PortStateReason] = Field(default=None)
+ reason_ttl: Optional[int] = Field(default=None)
+
+ service: Optional[NmapService] = None
+ scripts: List[NmapScript] = Field(default_factory=list)
+
+ def model_dump_postgres(self, run_id: int):
+ # Writes for the network_portscanport table
+ d = {"port_scan_id": run_id}
+ data = self.model_dump(
+ mode="json",
+ include={
+ "port",
+ "state",
+ "reason",
+ "reason_ttl",
+ },
+ )
+ d.update(data)
+ d["protocol"] = self.protocol.to_number()
+ if self.service:
+ d.update(self.service.model_dump_postgres())
+ return d
+
+
+class NmapHostScript(BaseModel):
+ id: str = Field()
+ output: Optional[str] = Field(default=None)
+
+
+class NmapTraceHop(BaseModel):
+ """
+ One hop observed during Nmap's traceroute.
+
+ Example XML:
+
+ """
+
+ ttl: int = Field()
+
+ ipaddr: Optional[str] = Field(
+ default=None,
+ description="IP address of the responding router or host",
+ )
+
+ rtt_ms: Optional[float] = Field(
+ default=None,
+ description="Round-trip time in milliseconds for the probe reaching this hop.",
+ )
+
+ host: Optional[str] = Field(
+ default=None,
+ description="Reverse DNS hostname for the hop if Nmap resolved one.",
+ )
+
+
+class NmapTrace(BaseModel):
+ """
+ Traceroute information collected by Nmap.
+
+ Nmap performs a single traceroute per host using probes matching the scan
+ type (typically TCP) directed at a chosen destination port.
+
+ Example XML:
+
+
+ ...
+
+ """
+
+ port: Optional[int] = Field(
+ default=None,
+ description="Destination port used for traceroute probes (may be absent depending on scan type).",
+ )
+ protocol: Optional[NmapTraceProtocol] = Field(
+ default=None,
+ description="Transport protocol used for the traceroute probes (tcp, udp, etc.).",
+ )
+
+ hops: List[NmapTraceHop] = Field(
+ default_factory=list,
+ description="Ordered list of hops observed during the traceroute.",
+ )
+
+ @property
+ def destination(self) -> Optional[NmapTraceHop]:
+ return self.hops[-1] if self.hops else None
+
+
+class NmapHostname(BaseModel):
+ #
+ name: str
+ type: Optional[Literal["PTR", "user"]] = None
+
+
+class NmapPortStats(BaseModel):
+ """
+ This is counts across all protocols scanned (tcp/udp)
+ """
+
+ open: int = 0
+ closed: int = 0
+ filtered: int = 0
+ unfiltered: int = 0
+ open_filtered: int = 0
+ closed_filtered: int = 0
+
+
+class NmapScanInfo(BaseModel):
+ """
+ We could have multiple protocols in one run.
+
+
+ """
+
+ type: NmapScanType = Field()
+ protocol: NmapProtocol = Field()
+ num_services: int = Field()
+ services: str = Field()
+
+ @cached_property
+ def port_set(self) -> Set[int]:
+ """
+ Expand the Nmap services string into a set of port numbers.
+ Example:
+ "22-25,80,443" -> {22,23,24,25,80,443}
+ """
+ ports: Set[int] = set()
+ for part in self.services.split(","):
+ if "-" in part:
+ start, end = part.split("-", 1)
+ ports.update(range(int(start), int(end) + 1))
+ else:
+ ports.add(int(part))
+ return ports
+
+
+class NmapRun(BaseModel):
+ """
+ A Nmap Run. Expects that we've only scanned ONE host.
+ """
+
+ command_line: str = Field()
+ started_at: AwareDatetimeISO = Field()
+ version: str = Field()
+ xmloutputversion: Literal["1.04"] = Field()
+
+ scan_infos: List[NmapScanInfo] = Field(min_length=1)
+
+ # comes from
+ finished_at: Optional[AwareDatetimeISO] = Field(default=None)
+ exit_status: Optional[Literal["success", "error"]] = Field(default=None)
+
+ #####
+ # Everything below here is from within the *single* host we've scanned
+ #####
+
+ #
+ host_state: NmapHostState = Field()
+ host_state_reason: NmapHostStatusReason = Field()
+ host_state_reason_ttl: Optional[int] = None
+
+ #
+ target_ip: IPvAnyAddressStr = Field()
+
+ hostnames: List[NmapHostname] = Field()
+
+ ports: List[NmapPort] = []
+ port_stats: NmapPortStats = Field()
+
+ #
+ uptime_seconds: Optional[int] = Field(default=None)
+ #
+ distance: Optional[int] = Field(description="approx number of hops", default=None)
+
+ #
+ tcp_sequence_index: Optional[int] = None
+ tcp_sequence_difficulty: Optional[str] = None
+
+ #
+ ipid_sequence_class: Optional[str] = None
+
+ #
+ tcp_timestamp_class: Optional[str] = None
+
+ #
+ srtt_us: Optional[int] = Field(
+ default=None, description="smoothed RTT estimate (microseconds µs)"
+ )
+ rttvar_us: Optional[int] = Field(
+ default=None, description="RTT variance (microseconds µs)"
+ )
+ timeout_us: Optional[int] = Field(
+ default=None, description="probe timeout (microseconds µs)"
+ )
+
+ os_matches: Optional[List[NmapOSMatch]] = Field(default=None)
+
+ host_scripts: List[NmapHostScript] = Field(default_factory=list)
+
+ trace: Optional[NmapTrace] = Field(default=None)
+
+ raw_xml: Optional[str] = None
+
+ @computed_field
+ @property
+ def last_boot(self) -> Optional[AwareDatetimeISO]:
+ if self.uptime_seconds:
+ return self.started_at - timedelta(seconds=self.uptime_seconds)
+
+ @property
+ def scan_info_tcp(self):
+ return next(
+ filter(lambda x: x.protocol == NmapProtocol.TCP, self.scan_infos), None
+ )
+
+ @property
+ def scan_info_udp(self):
+ return next(
+ filter(lambda x: x.protocol == NmapProtocol.UDP, self.scan_infos), None
+ )
+
+ @property
+ def latency_ms(self) -> Optional[float]:
+ return self.srtt_us / 1000 if self.srtt_us is not None else None
+
+ @property
+ def best_os_match(self) -> Optional[NmapOSMatch]:
+ if not self.os_matches:
+ return None
+ return max(self.os_matches, key=lambda m: m.accuracy)
+
+ def filter_ports(self, protocol: NmapProtocol, state: PortState) -> List[NmapPort]:
+ return [p for p in self.ports if p.protocol == protocol and p.state == state]
+
+ @property
+ def tcp_open_ports(self) -> List[int]:
+ """
+ Returns a list of open TCP port numbers.
+ """
+ return [
+ p.port
+ for p in self.filter_ports(protocol=NmapProtocol.TCP, state=PortState.OPEN)
+ ]
+
+ @property
+ def udp_open_ports(self) -> List[int]:
+ """
+ Returns a list of open UDP port numbers.
+ """
+ return [
+ p.port
+ for p in self.filter_ports(protocol=NmapProtocol.UDP, state=PortState.OPEN)
+ ]
+
+ @cached_property
+ def _port_index(self) -> Dict[Tuple[NmapProtocol, int], NmapPort]:
+ return {(p.protocol, p.port): p for p in self.ports}
+
+ def get_port_state(
+ self, port: int, protocol: NmapProtocol = NmapProtocol.TCP
+ ) -> PortState:
+ # Explicit (only if scanned and not closed)
+ if (protocol, port) in self._port_index:
+ return self._port_index[(protocol, port)].state
+
+ # Check if we even scanned it
+ scaninfo = next((s for s in self.scan_infos if s.protocol == protocol), None)
+ if scaninfo and port in scaninfo.port_set:
+ return PortState.CLOSED
+
+ # We didn't scan it
+ return PortState.NOT_SCANNED
+
+ def model_dump_postgres(self):
+ # Writes for the network_portscan table
+ d = dict()
+ data = self.model_dump(
+ mode="json",
+ include={
+ "started_at",
+ "host_state",
+ "host_state_reason",
+ "distance",
+ "uptime_seconds",
+ "raw_xml",
+ },
+ )
+ d.update(data)
+ d["ip"] = self.target_ip
+ d["xml_version"] = self.xmloutputversion
+ d["latency_ms"] = self.latency_ms
+ d["last_boot"] = self.last_boot
+ d["parsed"] = self.model_dump_json(indent=0)
+ d["open_tcp_ports"] = json.dumps(self.tcp_open_ports)
+ return d
diff --git a/generalresearch/models/network/rdns.py b/generalresearch/models/network/rdns.py
new file mode 100644
index 0000000..44697c7
--- /dev/null
+++ b/generalresearch/models/network/rdns.py
@@ -0,0 +1,101 @@
+import ipaddress
+import json
+from functools import cached_property
+
+from pydantic import BaseModel, Field, model_validator, computed_field
+from typing import Optional, List
+
+from typing_extensions import Self
+
+from generalresearch.models.custom_types import IPvAnyAddressStr
+import subprocess
+import re
+from typing import List
+import ipaddress
+import tldextract
+
+
+class RDNSResult(BaseModel):
+
+ ip: IPvAnyAddressStr = Field()
+
+ hostnames: List[str] = Field(default_factory=list)
+
+ @model_validator(mode="after")
+ def validate_hostname_prop(self):
+ assert len(self.hostnames) == self.hostname_count
+ if self.hostnames:
+ assert self.hostnames[0] == self.primary_hostname
+ assert self.primary_org in self.primary_hostname
+ return self
+
+ @computed_field(examples=["fixed-187-191-8-145.totalplay.net"])
+ @cached_property
+ def primary_hostname(self) -> Optional[str]:
+ if self.hostnames:
+ return self.hostnames[0]
+
+ @computed_field(examples=[1])
+ @cached_property
+ def hostname_count(self) -> int:
+ return len(self.hostnames)
+
+ @computed_field(examples=["totalplay"])
+ @cached_property
+ def primary_org(self) -> Optional[str]:
+ if self.primary_hostname:
+ return tldextract.extract(self.primary_hostname).domain
+
+ def model_dump_postgres(self):
+ # Writes for the network_rdnsresult table
+ d = self.model_dump(
+ mode="json",
+ include={"primary_hostname", "primary_org", "hostname_count"},
+ )
+ d["hostnames"] = json.dumps(self.hostnames)
+ return d
+
+ @classmethod
+ def from_dig(cls, ip: str, raw_output: str) -> Self:
+ hostnames: List[str] = []
+
+ for line in raw_output.splitlines():
+ m = PTR_RE.search(line)
+ if m:
+ hostnames.append(m.group(1))
+
+ return cls(
+ ip=ipaddress.ip_address(ip),
+ hostnames=hostnames,
+ )
+
+
+PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.")
+
+
+def dig_rdns(ip: str) -> RDNSResult:
+ args = get_dig_rdns_command(ip).split(" ")
+ proc = subprocess.run(
+ args,
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ raw = proc.stdout.strip()
+ return RDNSResult.from_dig(ip=ip, raw_output=raw)
+
+
+def get_dig_rdns_command(ip: str):
+ return " ".join(["dig", "+noall", "+answer", "-x", ip])
+
+
+def get_dig_version() -> str:
+ proc = subprocess.run(
+ ["dig", "-v"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ # e.g. DiG 9.18.39-0ubuntu0.22.04.2-Ubuntu
+ ver_str = proc.stderr.strip()
+ return ver_str.split("-", 1)[0].split(" ", 1)[1]
diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py
new file mode 100644
index 0000000..fba5dcb
--- /dev/null
+++ b/generalresearch/models/network/tool_run.py
@@ -0,0 +1,131 @@
+from datetime import datetime, timezone
+from enum import StrEnum
+from typing import Optional, Tuple
+from uuid import uuid4
+
+from pydantic import BaseModel, Field, PositiveInt
+
+from generalresearch.models.custom_types import (
+ AwareDatetimeISO,
+ IPvAnyAddressStr,
+ UUIDStr,
+)
+from generalresearch.models.network.nmap import NmapRun
+from generalresearch.models.network.rdns import (
+ RDNSResult,
+ get_dig_version,
+ dig_rdns,
+ get_dig_rdns_command,
+)
+from generalresearch.models.network.tool_utils import ToolRunCommand
+
+
+class ToolClass(StrEnum):
+ PORT_SCAN = "port_scan"
+ RDNS = "rdns"
+ PING = "ping"
+ TRACEROUTE = "traceroute"
+
+
+class ToolName(StrEnum):
+ NMAP = "nmap"
+ RUSTMAP = "rustmap"
+ DIG = "dig"
+ PING = "ping"
+ TRACEROUTE = "traceroute"
+ MTR = "mtr"
+
+
+class Status(StrEnum):
+ SUCCESS = "success"
+ FAILED = "failed"
+ TIMEOUT = "timeout"
+ ERROR = "error"
+
+
+class ToolRun(BaseModel):
+ """
+ A run of a networking tool against one host/ip.
+ """
+
+ id: Optional[PositiveInt] = Field(default=None)
+
+ ip: IPvAnyAddressStr = Field()
+ scan_group_id: UUIDStr = Field(default_factory=lambda: uuid4().hex)
+ tool_class: ToolClass = Field()
+ tool_name: ToolName = Field()
+ tool_version: str = Field()
+
+ started_at: AwareDatetimeISO = Field()
+ finished_at: Optional[AwareDatetimeISO] = Field(default=None)
+ status: Optional[Status] = Field(default=None)
+
+ raw_command: str = Field()
+
+ config: ToolRunCommand = Field()
+
+ def model_dump_postgres(self):
+ d = self.model_dump(mode="json", exclude={"config"})
+ d["config"] = self.config.model_dump_json()
+ return d
+
+
+class PortScanRun(ToolRun):
+ parsed: NmapRun = Field()
+
+ def model_dump_postgres(self):
+ d = super().model_dump_postgres()
+ d["run_id"] = self.id
+ d.update(self.parsed.model_dump_postgres())
+ return d
+
+
+class RDnsRun(ToolRun):
+ parsed: RDNSResult = Field()
+
+ def model_dump_postgres(self):
+ d = super().model_dump_postgres()
+ d["run_id"] = self.id
+ d.update(self.parsed.model_dump_postgres())
+ return d
+
+
+def new_tool_run_from_nmap(
+ nmap_run: NmapRun, scan_group_id: Optional[UUIDStr] = None
+) -> PortScanRun:
+ assert nmap_run.exit_status == "success"
+ return PortScanRun(
+ tool_name=ToolName.NMAP,
+ tool_class=ToolClass.PORT_SCAN,
+ tool_version=nmap_run.version,
+ status=Status.SUCCESS,
+ ip=nmap_run.target_ip,
+ started_at=nmap_run.started_at,
+ finished_at=nmap_run.finished_at,
+ raw_command=nmap_run.command_line,
+ scan_group_id=scan_group_id or uuid4().hex,
+ config=ToolRunCommand.from_raw_command(nmap_run.command_line),
+ parsed=nmap_run,
+ )
+
+
+def run_dig(ip: str, scan_group_id: Optional[UUIDStr] = None) -> RDnsRun:
+ started_at = datetime.now(tz=timezone.utc)
+ tool_version = get_dig_version()
+ rdns_result = dig_rdns(ip)
+ finished_at = datetime.now(tz=timezone.utc)
+ raw_command = get_dig_rdns_command(ip)
+
+ return RDnsRun(
+ tool_name=ToolName.DIG,
+ tool_class=ToolClass.RDNS,
+ tool_version=tool_version,
+ status=Status.SUCCESS,
+ ip=ip,
+ started_at=started_at,
+ finished_at=finished_at,
+ raw_command=raw_command,
+ scan_group_id=scan_group_id or uuid4().hex,
+ config=ToolRunCommand.from_raw_command(raw_command),
+ parsed=rdns_result,
+ )
diff --git a/generalresearch/models/network/tool_utils.py b/generalresearch/models/network/tool_utils.py
new file mode 100644
index 0000000..83d988d
--- /dev/null
+++ b/generalresearch/models/network/tool_utils.py
@@ -0,0 +1,69 @@
+import shlex
+from typing import Dict, List
+
+from pydantic import BaseModel
+from typing_extensions import Self
+
+"""
+e.g.: "nmap -Pn -sV -p 80,443 --reason --max-retries=3 1.2.3.4"
+{'command': 'nmap',
+ 'options': {'p': '80,443', 'max-retries': '3'},
+ 'flags': ['Pn', 'sV', 'reason'],
+ 'positionals': ['1.2.3.4']}
+"""
+
+
+class ToolRunCommand(BaseModel):
+ command: str
+ options: Dict[str, str]
+ flags: List[str]
+ positionals: List[str]
+
+ @classmethod
+ def from_raw_command(cls, s: str) -> Self:
+ return cls.model_validate(parse_command(s))
+
+
+def parse_command(cmd: str):
+ tokens = shlex.split(cmd)
+
+ result = {
+ "command": tokens[0],
+ "options": {},
+ "flags": [],
+ "positionals": [],
+ }
+
+ i = 1
+ while i < len(tokens):
+ tok = tokens[i]
+
+ # --key=value
+ if tok.startswith("--") and "=" in tok:
+ k, v = tok[2:].split("=", 1)
+ result["options"][k] = v
+
+ # --key value
+ elif tok.startswith("--"):
+ key = tok[2:]
+ if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"):
+ result["options"][key] = tokens[i + 1]
+ i += 1
+ else:
+ result["flags"].append(key)
+
+ # short flag or short flag with arg
+ elif tok.startswith("-"):
+ if i + 1 < len(tokens) and not tokens[i + 1].startswith("-"):
+ result["options"][tok[1:]] = tokens[i + 1]
+ i += 1
+ else:
+ result["flags"].append(tok[1:])
+
+ else:
+ result["positionals"].append(tok)
+
+ i += 1
+
+ result["flags"] = sorted(result["flags"])
+ return result
diff --git a/generalresearch/models/network/xml_parser.py b/generalresearch/models/network/xml_parser.py
new file mode 100644
index 0000000..02265a8
--- /dev/null
+++ b/generalresearch/models/network/xml_parser.py
@@ -0,0 +1,405 @@
+import xml.etree.cElementTree as ET
+from datetime import datetime, timezone
+from typing import List, Dict, Any, Tuple, Optional
+
+from generalresearch.models.network.nmap import (
+ NmapHostname,
+ NmapRun,
+ NmapPort,
+ PortState,
+ PortStateReason,
+ NmapService,
+ NmapScript,
+ NmapPortStats,
+ NmapScanType,
+ NmapProtocol,
+ NmapHostState,
+ NmapHostStatusReason,
+ NmapHostScript,
+ NmapOSMatch,
+ NmapOSClass,
+ NmapTrace,
+ NmapTraceHop,
+ NmapTraceProtocol,
+ NmapScanInfo,
+)
+
+
+class NmapParserException(Exception):
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return self.msg
+
+
+class NmapXmlParser:
+ """
+ Example: https://nmap.org/book/output-formats-xml-output.html
+ Full DTD: https://nmap.org/book/nmap-dtd.html
+ """
+
+ @classmethod
+ def parse_xml(cls, nmap_data: str) -> NmapRun:
+ """
+ Expects a full nmap scan report.
+ """
+
+ try:
+ root = ET.fromstring(nmap_data)
+ except Exception as e:
+ emsg = "Wrong XML structure: cannot parse data: {0}".format(e)
+ raise NmapParserException(emsg)
+
+ if root.tag != "nmaprun":
+ raise NmapParserException("Unpexpected data structure for XML " "root node")
+ return cls._parse_xml_nmaprun(root)
+
+ @classmethod
+ def _parse_xml_nmaprun(cls, root: ET.Element) -> NmapRun:
+ """
+ This method parses out a full nmap scan report from its XML root
+ node: . We expect there is only 1 host in this report!
+
+ :param root: Element from xml.ElementTree (top of XML the document)
+ """
+ cls._validate_nmap_root(root)
+ host_count = len(root.findall(".//host"))
+ assert host_count == 1, f"Expected 1 host, got {host_count}"
+
+ xml_str = ET.tostring(root, encoding="unicode").replace("\n", "")
+ nmap_data = {"raw_xml": xml_str}
+ nmap_data.update(cls._parse_nmaprun(root))
+
+ nmap_data["scan_infos"] = [
+ cls._parse_scaninfo(scaninfo_el)
+ for scaninfo_el in root.findall(".//scaninfo")
+ ]
+
+ nmap_data.update(cls._parse_runstats(root))
+
+ nmap_data.update(cls._parse_xml_host(root.find(".//host")))
+
+ return NmapRun.model_validate(nmap_data)
+
+ @classmethod
+ def _validate_nmap_root(cls, root: ET.Element) -> None:
+ allowed = {
+ "scaninfo",
+ "host",
+ "runstats",
+ "verbose",
+ "debugging",
+ }
+
+ found = {child.tag for child in root}
+ unexpected = found - allowed
+ if unexpected:
+ raise ValueError(
+ f"Unexpected top-level tags in nmap XML: {sorted(unexpected)}"
+ )
+
+ @classmethod
+ def _parse_scaninfo(cls, scaninfo_el: ET.Element) -> NmapScanInfo:
+ data = dict()
+ data["type"] = NmapScanType(scaninfo_el.attrib["type"])
+ data["protocol"] = NmapProtocol(scaninfo_el.attrib["protocol"])
+ data["num_services"] = scaninfo_el.attrib["numservices"]
+ data["services"] = scaninfo_el.attrib["services"]
+ return NmapScanInfo.model_validate(data)
+
+ @classmethod
+ def _parse_runstats(cls, root: ET.Element) -> Dict:
+ runstats = root.find("runstats")
+ if runstats is None:
+ return {}
+
+ finished = runstats.find("finished")
+ if finished is None:
+ return {}
+
+ finished_at = None
+ ts = finished.attrib.get("time")
+ if ts:
+ finished_at = datetime.fromtimestamp(int(ts), tz=timezone.utc)
+
+ return {
+ "finished_at": finished_at,
+ "exit_status": finished.attrib.get("exit"),
+ }
+
+ @classmethod
+ def _parse_nmaprun(cls, nmaprun_el: ET.Element) -> Dict:
+ nmap_data = dict()
+ nmaprun = dict(nmaprun_el.attrib)
+ nmap_data["command_line"] = nmaprun["args"]
+ nmap_data["started_at"] = datetime.fromtimestamp(
+ float(nmaprun["start"]), tz=timezone.utc
+ )
+ nmap_data["version"] = nmaprun["version"]
+ nmap_data["xmloutputversion"] = nmaprun["xmloutputversion"]
+ return nmap_data
+
+ @classmethod
+ def _parse_xml_host(cls, host_el: ET.Element) -> Dict:
+ """
+ Receives a XML tag representing a scanned host with
+ its services.
+ """
+ data = dict()
+
+ #
+ status_el = host_el.find("status")
+ data["host_state"] = NmapHostState(status_el.attrib["state"])
+ data["host_state_reason"] = NmapHostStatusReason(status_el.attrib["reason"])
+ host_state_reason_ttl = status_el.attrib.get("reason_ttl")
+ if host_state_reason_ttl:
+ data["host_state_reason_ttl"] = int(host_state_reason_ttl)
+
+ #
+ address_el = host_el.find("address")
+ data["target_ip"] = address_el.attrib["addr"]
+
+ data["hostnames"] = cls._parse_hostnames(host_el.find("hostnames"))
+
+ data["ports"], data["port_stats"] = cls._parse_xml_ports(host_el.find("ports"))
+
+ uptime = host_el.find("uptime")
+ if uptime is not None:
+ data["uptime_seconds"] = int(uptime.attrib["seconds"])
+
+ distance = host_el.find("distance")
+ if distance is not None:
+ data["distance"] = int(distance.attrib["value"])
+
+ tcpsequence = host_el.find("tcpsequence")
+ if tcpsequence is not None:
+ data["tcp_sequence_index"] = int(tcpsequence.attrib["index"])
+ data["tcp_sequence_difficulty"] = tcpsequence.attrib["difficulty"]
+ ipidsequence = host_el.find("ipidsequence")
+ if ipidsequence is not None:
+ data["ipid_sequence_class"] = ipidsequence.attrib["class"]
+ tcptssequence = host_el.find("tcptssequence")
+ if tcptssequence is not None:
+ data["tcp_timestamp_class"] = tcptssequence.attrib["class"]
+
+ times_elem = host_el.find("times")
+ if times_elem is not None:
+ data.update(
+ {
+ "srtt_us": int(times_elem.attrib.get("srtt", 0)) or None,
+ "rttvar_us": int(times_elem.attrib.get("rttvar", 0)) or None,
+ "timeout_us": int(times_elem.attrib.get("to", 0)) or None,
+ }
+ )
+
+ hostscripts_el = host_el.find("hostscript")
+ if hostscripts_el is not None:
+ data["host_scripts"] = [
+ NmapHostScript(id=el.attrib["id"], output=el.attrib.get("output"))
+ for el in hostscripts_el.findall("script")
+ ]
+
+ data["os_matches"] = cls._parse_os_matches(host_el)
+
+ data["trace"] = cls._parse_trace(host_el)
+
+ return data
+
+ @classmethod
+ def _parse_os_matches(cls, host_el: ET.Element) -> List[NmapOSMatch] | None:
+ os_elem = host_el.find("os")
+ if os_elem is None:
+ return None
+
+ matches: List[NmapOSMatch] = []
+
+ for m in os_elem.findall("osmatch"):
+ classes: List[NmapOSClass] = []
+
+ for c in m.findall("osclass"):
+ cpes = [e.text.strip() for e in c.findall("cpe") if e.text]
+
+ classes.append(
+ NmapOSClass(
+ vendor=c.attrib.get("vendor"),
+ osfamily=c.attrib.get("osfamily"),
+ osgen=c.attrib.get("osgen"),
+ accuracy=(
+ int(c.attrib["accuracy"]) if "accuracy" in c.attrib else None
+ ),
+ cpe=cpes or None,
+ )
+ )
+
+ matches.append(
+ NmapOSMatch(
+ name=m.attrib["name"],
+ accuracy=int(m.attrib["accuracy"]),
+ classes=classes,
+ )
+ )
+
+ return matches or None
+
+ @classmethod
+ def _parse_hostnames(cls, hostnames_el: ET.Element) -> List[NmapHostname]:
+ """
+ Parses the hostnames element.
+ e.g.
+
+
+ """
+ return [cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname")]
+
+ @classmethod
+ def _parse_hostname(cls, hostname_el: ET.Element) -> NmapHostname:
+ """
+ Parses the hostname element.
+ e.g.
+
+ :param hostname_el: XML tag from a nmap scan
+ """
+ return NmapHostname.model_validate(dict(hostname_el.attrib))
+
+ @classmethod
+ def _parse_xml_ports(
+ cls, ports_elem: ET.Element
+ ) -> Tuple[List[NmapPort], NmapPortStats]:
+ """
+ Parses the list of scanned services from a targeted host.
+ """
+ ports: List[NmapPort] = []
+ stats = NmapPortStats()
+
+ # handle extraports first
+ for e in ports_elem.findall("extraports"):
+ state = PortState(e.attrib["state"])
+ count = int(e.attrib["count"])
+
+ key = state.value.replace("|", "_")
+ setattr(stats, key, getattr(stats, key) + count)
+
+ for port_elem in ports_elem.findall("port"):
+ port = cls._parse_xml_port(port_elem)
+ ports.append(port)
+ key = port.state.value.replace("|", "_")
+ setattr(stats, key, getattr(stats, key) + 1)
+ return ports, stats
+
+ @classmethod
+ def _parse_xml_service(cls, service_elem: ET.Element) -> NmapService:
+ svc = {
+ "name": service_elem.attrib.get("name"),
+ "product": service_elem.attrib.get("product"),
+ "version": service_elem.attrib.get("version"),
+ "extrainfo": service_elem.attrib.get("extrainfo"),
+ "method": service_elem.attrib.get("method"),
+ "conf": (
+ int(service_elem.attrib["conf"])
+ if "conf" in service_elem.attrib
+ else None
+ ),
+ "cpe": [e.text.strip() for e in service_elem.findall("cpe")],
+ }
+
+ return NmapService.model_validate(svc)
+
+ @classmethod
+ def _parse_xml_script(cls, script_elem: ET.Element) -> NmapScript:
+ output = script_elem.attrib.get("output")
+ if output:
+ output = output.strip()
+ script = {
+ "id": script_elem.attrib["id"],
+ "output": output,
+ }
+
+ elements: Dict[str, Any] = {}
+
+ # handle value
+ for elem in script_elem.findall(".//elem"):
+ key = elem.attrib.get("key")
+ if key:
+ elements[key.strip()] = elem.text.strip()
+
+ script["elements"] = elements
+ return NmapScript.model_validate(script)
+
+ @classmethod
+ def _parse_xml_port(cls, port_elem: ET.Element) -> NmapPort:
+ """
+
+
+
+
+
+ """
+ state_elem = port_elem.find("state")
+
+ port = {
+ "port": int(port_elem.attrib["portid"]),
+ "protocol": port_elem.attrib["protocol"],
+ "state": PortState(state_elem.attrib["state"]),
+ "reason": (
+ PortStateReason(state_elem.attrib["reason"])
+ if "reason" in state_elem.attrib
+ else None
+ ),
+ "reason_ttl": (
+ int(state_elem.attrib["reason_ttl"])
+ if "reason_ttl" in state_elem.attrib
+ else None
+ ),
+ }
+
+ service_elem = port_elem.find("service")
+ if service_elem is not None:
+ port["service"] = cls._parse_xml_service(service_elem)
+
+ port["scripts"] = []
+ for script_elem in port_elem.findall("script"):
+ port["scripts"].append(cls._parse_xml_script(script_elem))
+
+ return NmapPort.model_validate(port)
+
+ @classmethod
+ def _parse_trace(cls, host_elem: ET.Element) -> Optional[NmapTrace]:
+ trace_elem = host_elem.find("trace")
+ if trace_elem is None:
+ return None
+
+ port_attr = trace_elem.attrib.get("port")
+ proto_attr = trace_elem.attrib.get("proto")
+
+ hops: List[NmapTraceHop] = []
+
+ for hop_elem in trace_elem.findall("hop"):
+ ttl = hop_elem.attrib.get("ttl")
+ if ttl is None:
+ continue # ttl is required by the DTD but guard anyway
+
+ rtt = hop_elem.attrib.get("rtt")
+ ipaddr = hop_elem.attrib.get("ipaddr")
+ host = hop_elem.attrib.get("host")
+
+ hops.append(
+ NmapTraceHop(
+ ttl=int(ttl),
+ ipaddr=ipaddr,
+ rtt_ms=float(rtt) if rtt is not None else None,
+ host=host,
+ )
+ )
+
+ return NmapTrace(
+ port=int(port_attr) if port_attr is not None else None,
+ protocol=NmapTraceProtocol(proto_attr) if proto_attr is not None else None,
+ hops=hops,
+ )
+
diff --git a/pyproject.toml b/pyproject.toml
index 93bdca2..55da235 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -31,6 +31,7 @@ dependencies = [
"scipy",
"sentry-sdk",
"slackclient",
+ "tldextract",
"ua-parser",
"user-agents",
"wrapt",
diff --git a/requirements.txt b/requirements.txt
index 7a80011..1f55009 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,6 +23,7 @@ dnspython==2.7.0
ecdsa==0.19.1
email-validator==2.3.0
Faker==37.6.0
+filelock==3.25.1
frozenlist==1.7.0
fsspec==2025.7.0
geoip2==4.7.0
@@ -78,6 +79,7 @@ pytz==2025.2
PyYAML==6.0.2
redis==6.4.0
requests==2.32.5
+requests-file==3.0.1
rsa==4.9.1
s3transfer==0.13.1
scipy==1.16.1
@@ -88,6 +90,7 @@ slackclient==2.9.4
sniffio==1.3.1
sortedcontainers==2.4.0
tblib==3.1.0
+tldextract==5.3.1
toolz==1.0.0
tornado==6.5.2
trio==0.30.0
diff --git a/test_utils/managers/network/__init__.py b/test_utils/managers/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/test_utils/managers/network/conftest.py b/test_utils/managers/network/conftest.py
new file mode 100644
index 0000000..70fda4e
--- /dev/null
+++ b/test_utils/managers/network/conftest.py
@@ -0,0 +1,86 @@
+import os
+from datetime import datetime, timezone
+from typing import Callable, TYPE_CHECKING
+from uuid import uuid4
+
+import pytest
+
+from generalresearch.managers.network.label import IPLabelManager
+from generalresearch.managers.network.nmap import NmapManager
+from generalresearch.managers.network.tool_run import ToolRunManager
+from generalresearch.models.network.rdns import (
+ RDNSResult,
+ get_dig_version,
+ get_dig_rdns_command,
+)
+from generalresearch.models.network.tool_run import (
+ RDnsRun,
+ ToolName,
+ ToolClass,
+ Status,
+)
+from generalresearch.models.network.tool_utils import ToolRunCommand
+from generalresearch.models.network.xml_parser import NmapXmlParser
+
+
+@pytest.fixture(scope="session")
+def iplabel_manager(thl_web_rw) -> IPLabelManager:
+ assert "/unittest-" in thl_web_rw.dsn.path
+
+ return IPLabelManager(pg_config=thl_web_rw)
+
+
+@pytest.fixture(scope="session")
+def nmap_manager(thl_web_rw) -> NmapManager:
+ assert "/unittest-" in thl_web_rw.dsn.path
+
+ return NmapManager(pg_config=thl_web_rw)
+
+
+@pytest.fixture(scope="session")
+def toolrun_manager(thl_web_rw) -> ToolRunManager:
+ assert "/unittest-" in thl_web_rw.dsn.path
+
+ return ToolRunManager(pg_config=thl_web_rw)
+
+
+@pytest.fixture(scope="session")
+def nmap_xml_str(request) -> str:
+ fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml")
+ with open(fp, "r") as f:
+ data = f.read()
+ return data
+
+
+@pytest.fixture(scope="session")
+def nmap_run(nmap_xml_str):
+ return NmapXmlParser.parse_xml(nmap_xml_str)
+
+
+@pytest.fixture(scope="session")
+def raw_dig_output():
+ return "156.32.33.45.in-addr.arpa. 300 IN PTR scanme.nmap.org."
+
+
+@pytest.fixture(scope="session")
+def reverse_dns_run(raw_dig_output):
+ ip = "45.33.32.156"
+ rdns_result = RDNSResult.from_dig(ip=ip, raw_output=raw_dig_output)
+ scan_group_id = uuid4().hex
+ started_at = datetime.now(tz=timezone.utc)
+ tool_version = get_dig_version()
+ finished_at = datetime.now(tz=timezone.utc)
+ raw_command = get_dig_rdns_command(ip)
+ return RDnsRun(
+ tool_name=ToolName.DIG,
+ tool_class=ToolClass.RDNS,
+ tool_version=tool_version,
+ status=Status.SUCCESS,
+ ip=ip,
+ started_at=started_at,
+ finished_at=finished_at,
+ raw_command=raw_command,
+ scan_group_id=scan_group_id or uuid4().hex,
+ config=ToolRunCommand.from_raw_command(raw_command),
+ parsed=rdns_result,
+ )
diff --git a/test_utils/models/conftest.py b/test_utils/models/conftest.py
index 468bea2..64bdec6 100644
--- a/test_utils/models/conftest.py
+++ b/test_utils/models/conftest.py
@@ -590,7 +590,7 @@ def ip_record_factory(
@pytest.fixture(scope="session")
-def buyer(buyer_manager: "BuyerManager") -> Buyer:
+def buyer(buyer_manager: "BuyerManager") -> "Buyer":
buyer_code = uuid4().hex
buyer_manager.bulk_get_or_create(source=Source.TESTING, codes=[buyer_code])
b = Buyer(
@@ -601,7 +601,7 @@ def buyer(buyer_manager: "BuyerManager") -> Buyer:
@pytest.fixture(scope="session")
-def buyer_factory(buyer_manager: "BuyerManager") -> Callable[..., Buyer]:
+def buyer_factory(buyer_manager: "BuyerManager") -> Callable[..., "Buyer"]:
def _inner() -> Buyer:
return buyer_manager.bulk_get_or_create(
@@ -612,7 +612,7 @@ def buyer_factory(buyer_manager: "BuyerManager") -> Callable[..., Buyer]:
@pytest.fixture(scope="session")
-def survey(survey_manager: "SurveyManager", buyer: "Buyer") -> Survey:
+def survey(survey_manager: "SurveyManager", buyer: "Buyer") -> "Survey":
s = Survey(source=Source.TESTING, survey_id=uuid4().hex, buyer_code=buyer.code)
survey_manager.create_bulk([s])
return s
diff --git a/tests/conftest.py b/tests/conftest.py
index 30ed1c7..2482269 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -13,6 +13,7 @@ pytest_plugins = [
"test_utils.managers.conftest",
"test_utils.managers.contest.conftest",
"test_utils.managers.ledger.conftest",
+ "test_utils.managers.network.conftest",
"test_utils.managers.upk.conftest",
# -- Models
"test_utils.models.conftest",
diff --git a/tests/data/nmaprun1.xml b/tests/data/nmaprun1.xml
new file mode 100644
index 0000000..c5fed6c
--- /dev/null
+++ b/tests/data/nmaprun1.xml
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/data/nmaprun2.xml b/tests/data/nmaprun2.xml
new file mode 100644
index 0000000..932896c
--- /dev/null
+++ b/tests/data/nmaprun2.xml
@@ -0,0 +1,118 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ cpe:/a:openbsd:openssh:6.6.1p1
+ cpe:/o:linux:linux_kernel
+
+
+
+
+
+
+
+
+
+
+ cpe:/a:apache:http_server:2.4.7
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ cpe:/o:linux:linux_kernel:2.6
+
+
+ cpe:/o:linux:linux_kernel:3
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/managers/network/__init__.py b/tests/managers/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py
new file mode 100644
index 0000000..5b9a790
--- /dev/null
+++ b/tests/managers/network/label.py
@@ -0,0 +1,202 @@
+import ipaddress
+
+import faker
+import pytest
+from psycopg.errors import UniqueViolation
+from pydantic import ValidationError
+
+from generalresearch.managers.network.label import IPLabelManager
+from generalresearch.models.network.label import (
+ IPLabel,
+ IPLabelKind,
+ IPLabelSource,
+ IPLabelMetadata,
+)
+from generalresearch.models.thl.ipinfo import normalize_ip
+
+fake = faker.Faker()
+
+
+@pytest.fixture
+def ip_label(utc_now) -> IPLabel:
+ ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+ return IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ metadata=IPLabelMetadata(services=["RDP"])
+ )
+
+
+def test_model(utc_now):
+ ip = fake.ipv4_public()
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ assert lbl.ip.prefixlen == 32
+ print(f"{lbl.ip=}")
+
+ ip = ipaddress.IPv4Network((ip, 24), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+ with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"):
+ IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=fake.ipv6(),
+ )
+
+ ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+ ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False)
+ lbl = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip,
+ )
+ print(f"{lbl.ip=}")
+
+
+def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel):
+ iplabel_manager.create(ip_label)
+
+ with pytest.raises(
+ UniqueViolation, match="duplicate key value violates unique constraint"
+ ):
+ iplabel_manager.create(ip_label)
+
+
+def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago):
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 0
+
+ iplabel_manager.create(ip_label)
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 1
+
+ out = res[0]
+ assert out == ip_label
+
+ res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago)
+ assert len(res) == 1
+
+ ip_label2 = ip_label.model_copy()
+ ip_label2.ip = fake.ipv4_public()
+ iplabel_manager.create(ip_label2)
+ res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip])
+ assert len(res) == 2
+
+
+def test_filter_network(
+ iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago
+):
+ print(ip_label)
+ ip_label = ip_label.model_copy()
+ ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False)
+
+ iplabel_manager.create(ip_label)
+ res = iplabel_manager.filter(ips=[ip_label.ip])
+ assert len(res) == 1
+
+ out = res[0]
+ assert out == ip_label
+
+ res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago)
+ assert len(res) == 1
+
+ ip_label2 = ip_label.model_copy()
+ ip_label2.ip = fake.ipv4_public()
+ iplabel_manager.create(ip_label2)
+ res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip])
+ assert len(res) == 2
+
+
+def test_network(iplabel_manager: IPLabelManager, utc_now):
+ # This is a fully-specific /128 ipv6 address.
+ # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d'
+ ip = fake.ipv6()
+ # Generally, we'd want to annotate the /64 network
+ # e.g. '51b7:b38d:8717:6c5b::/64'
+ ip_64 = ipaddress.IPv6Network((ip, 64), strict=False)
+
+ label = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip_64,
+ )
+ iplabel_manager.create(label)
+
+ # If I query for the /128 directly, I won't find it
+ res = iplabel_manager.filter(ips=[ip])
+ assert len(res) == 0
+
+ # If I query for the /64 network I will
+ res = iplabel_manager.filter(ips=[ip_64])
+ assert len(res) == 1
+
+ # Or, I can query for the /128 ip IN a network
+ res = iplabel_manager.filter(ip_in_network=ip)
+ assert len(res) == 1
+
+
+def test_label_cidr_and_ipinfo(
+ iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now
+):
+ # We have network_iplabel.ip as a cidr col and
+ # thl_ipinformation.ip as a inet col. Make sure we can join appropriately
+ ip = fake.ipv6()
+ ip_information_factory(ip=ip, geoname=ip_geoname)
+ # We normalize for storage into ipinfo table
+ ip_norm, prefix = normalize_ip(ip)
+
+ # Test with a larger network
+ ip_48 = ipaddress.IPv6Network((ip, 48), strict=False)
+ print(f"{ip=}")
+ print(f"{ip_norm=}")
+ print(f"{ip_48=}")
+ label = IPLabel(
+ label_kind=IPLabelKind.VPN,
+ labeled_at=utc_now,
+ source=IPLabelSource.INTERNAL_USE,
+ provider="GeoNodE",
+ created_at=utc_now,
+ ip=ip_48,
+ )
+ iplabel_manager.create(label)
+
+ res = iplabel_manager.test_join(ip_norm)
+ print(res)
diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py
new file mode 100644
index 0000000..a598a71
--- /dev/null
+++ b/tests/managers/network/tool_run.py
@@ -0,0 +1,48 @@
+from uuid import uuid4
+
+import faker
+
+from generalresearch.models.network.tool_run import (
+ new_tool_run_from_nmap,
+ run_dig,
+)
+fake = faker.Faker()
+
+
+def test_create_tool_run_from_nmap(nmap_run, toolrun_manager):
+ scan_group_id = uuid4().hex
+ run = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id)
+
+ toolrun_manager.create_portscan_run(run)
+
+ run_out = toolrun_manager.get_portscan_run(run.id)
+
+ assert run == run_out
+
+
+def test_create_tool_run_from_dig_fixture(reverse_dns_run, toolrun_manager):
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
+
+
+def test_run_dig(toolrun_manager):
+ reverse_dns_run = run_dig(ip="65.19.129.53")
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
+
+def test_run_dig_empty(toolrun_manager):
+ reverse_dns_run = run_dig(ip=fake.ipv6())
+
+ toolrun_manager.create_rdns_run(reverse_dns_run)
+
+ run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
+
+ assert reverse_dns_run == run_out
\ No newline at end of file
diff --git a/tests/models/network/__init__.py b/tests/models/network/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py
new file mode 100644
index 0000000..4fc7014
--- /dev/null
+++ b/tests/models/network/nmap.py
@@ -0,0 +1,32 @@
+import os
+
+import pytest
+
+from generalresearch.models.network.xml_parser import NmapXmlParser
+
+
+@pytest.fixture
+def nmap_xml_str(request) -> str:
+ fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml")
+ with open(fp, "r") as f:
+ data = f.read()
+ return data
+
+
+@pytest.fixture
+def nmap_xml_str2(request) -> str:
+ fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml")
+ with open(fp, "r") as f:
+ data = f.read()
+ return data
+
+
+def test_nmap_xml_parser(nmap_xml_str, nmap_xml_str2):
+ p = NmapXmlParser()
+ n = p.parse_xml(nmap_xml_str)
+ assert n.tcp_open_ports == [61232]
+ assert len(n.trace.hops) == 18
+
+ n = p.parse_xml(nmap_xml_str2)
+ assert n.tcp_open_ports == [22, 80, 9929, 31337]
+ assert n.trace is None
diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py
new file mode 100644
index 0000000..9167749
--- /dev/null
+++ b/tests/models/network/rdns.py
@@ -0,0 +1,23 @@
+from generalresearch.models.network.rdns import dig_rdns
+import faker
+
+fake = faker.Faker()
+
+
+def test_dig_rdns():
+ # Actually runs dig -x. Idk how stable this is
+ ip = "45.33.32.156"
+ rdns_result = dig_rdns(ip)
+ assert rdns_result.primary_hostname == "scanme.nmap.org"
+ assert rdns_result.primary_org == "nmap"
+
+ ip = "65.19.129.53"
+ rdns_result = dig_rdns(ip)
+ assert rdns_result.primary_hostname == "in1-smtp.grlengine.com"
+ assert rdns_result.primary_org == "grlengine"
+
+ ip = fake.ipv6()
+ rdns_result = dig_rdns(ip)
+ assert rdns_result.primary_hostname is None
+ assert rdns_result.primary_org is None
+ print(rdns_result.model_dump_postgres())
diff --git a/tests/models/network/tool_run.py b/tests/models/network/tool_run.py
new file mode 100644
index 0000000..c643503
--- /dev/null
+++ b/tests/models/network/tool_run.py
@@ -0,0 +1,8 @@
+from uuid import uuid4
+
+from generalresearch.models.network.tool_run import new_tool_run_from_nmap
+
+
+def test_new_tool_run_from_nmap(nmap_run):
+ scan_group_id = uuid4().hex
+ run, scan = new_tool_run_from_nmap(nmap_run, scan_group_id=scan_group_id)
--
cgit v1.2.3