diff options
| author | stuppie | 2026-03-10 14:35:50 -0600 |
|---|---|---|
| committer | stuppie | 2026-03-10 14:35:50 -0600 |
| commit | e54b19afcdf91a9574064adb3f6e64adc03121c4 (patch) | |
| tree | 728bbe4b4bde860b441850958574dffc896cd2a7 | |
| parent | 5b2facfababef77fd39ef814b7bb67e5b3ea6ee3 (diff) | |
| download | generalresearch-e54b19afcdf91a9574064adb3f6e64adc03121c4.tar.gz generalresearch-e54b19afcdf91a9574064adb3f6e64adc03121c4.zip | |
add mtr / traceroute models, runner, parser
| -rw-r--r-- | generalresearch/models/network/definitions.py | 69 | ||||
| -rw-r--r-- | generalresearch/models/network/mtr.py | 205 | ||||
| -rw-r--r-- | generalresearch/models/network/nmap.py | 42 | ||||
| -rw-r--r-- | generalresearch/models/network/xml_parser.py | 16 |
4 files changed, 294 insertions, 38 deletions
diff --git a/generalresearch/models/network/definitions.py b/generalresearch/models/network/definitions.py new file mode 100644 index 0000000..4fb44f4 --- /dev/null +++ b/generalresearch/models/network/definitions.py @@ -0,0 +1,69 @@ +from enum import StrEnum +from ipaddress import ip_address, ip_network +from typing import Optional + +CGNAT_NET = ip_network("100.64.0.0/10") + + +class IPProtocol(StrEnum): + TCP = "tcp" + UDP = "udp" + SCTP = "sctp" + IP = "ip" + ICMP = "icmp" + ICMPv6 = "icmpv6" + + def to_number(self) -> int: + # https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml + return { + self.TCP: 6, + self.UDP: 17, + self.SCTP: 132, + self.IP: 4, + self.ICMP: 1, + self.ICMPv6: 58, + }[self] + + +class IPKind(StrEnum): + PUBLIC = "public" + PRIVATE = "private" + CGNAT = "carrier_nat" + LOOPBACK = "loopback" + LINK_LOCAL = "link_local" + MULTICAST = "multicast" + RESERVED = "reserved" + UNSPECIFIED = "unspecified" + + +def get_ip_kind(ip: Optional[str]) -> Optional[IPKind]: + if not ip: + return None + + ip_obj = ip_address(ip) + + if ip_obj in CGNAT_NET: + return IPKind.CGNAT + + if ip_obj.is_loopback: + return IPKind.LOOPBACK + + if ip_obj.is_link_local: + return IPKind.LINK_LOCAL + + if ip_obj.is_multicast: + return IPKind.MULTICAST + + if ip_obj.is_unspecified: + return IPKind.UNSPECIFIED + + if ip_obj.is_private: + return IPKind.PRIVATE + + if ip_obj.is_reserved: + return IPKind.RESERVED + + if ip_obj.is_global: + return IPKind.PUBLIC + + return None diff --git a/generalresearch/models/network/mtr.py b/generalresearch/models/network/mtr.py new file mode 100644 index 0000000..98e7c16 --- /dev/null +++ b/generalresearch/models/network/mtr.py @@ -0,0 +1,205 @@ +import json +import re +import subprocess +from ipaddress import ip_address +from typing import List, Optional, Dict + +from pydantic import Field, field_validator, BaseModel, ConfigDict, model_validator + +from generalresearch.models.network.definitions import IPProtocol, get_ip_kind, IPKind + + +class MTRHop(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + hop: int = Field(alias="count") + host: str + asn: Optional[str] = Field(default=None, alias="ASN") + + loss_pct: float = Field(alias="Loss%") + sent: int = Field(alias="Snt") + + last_ms: float = Field(alias="Last") + avg_ms: float = Field(alias="Avg") + best_ms: float = Field(alias="Best") + worst_ms: float = Field(alias="Wrst") + stdev_ms: float = Field(alias="StDev") + + hostname: Optional[str] = None + ip: Optional[str] = None + + @field_validator("asn") + @classmethod + def normalize_asn(cls, v): + if v == "AS???": + return None + return v + + @model_validator(mode="after") + def parse_host(self): + host = self.host.strip() + + # hostname (ip) + m = HOST_RE.match(host) + if m: + self.hostname = m.group("hostname") + self.ip = m.group("ip") + return self + + # ip only + try: + ip_address(host) + self.ip = host + self.hostname = None + return self + except ValueError: + pass + + # hostname only + self.hostname = host + self.ip = None + return self + + @property + def ip_kind(self) -> Optional[IPKind]: + return get_ip_kind(self.ip) + + @property + def icmp_rate_limited(self): + if self.avg_ms == 0: + return False + return self.stdev_ms > self.avg_ms or self.worst_ms > self.best_ms * 10 + + +class MTRReport(BaseModel): + model_config = ConfigDict(populate_by_name=True) + + source: str = Field(description="Hostname of the system running mtr.", alias="src") + destination: str = Field( + description="Destination hostname or IP being traced.", alias="dst" + ) + tos: int = Field(description="IP Type-of-Service (TOS) value used for probes.") + tests: int = Field(description="Number of probes sent per hop.") + psize: int = Field(description="Probe packet size in bytes.") + bitpattern: str = Field(description="Payload byte pattern used in probes (hex).") + + hops: List[MTRHop] = Field() + + def print_report(self) -> None: + print(f"MTR Report → {self.destination}\n") + host_max_len = max(len(h.host) for h in self.hops) + + header = ( + f"{'Hop':>3} " + f"{'Host':<{host_max_len}} " + f"{'Kind':<10} " + f"{'ASN':<8} " + f"{'Loss%':>6} {'Sent':>5} " + f"{'Last':>7} {'Avg':>7} {'Best':>7} {'Worst':>7} {'StDev':>7}" + ) + print(header) + print("-" * len(header)) + + for hop in self.hops: + print( + f"{hop.hop:>3} " + f"{hop.host:<{host_max_len}} " + f"{hop.ip_kind or '???':<10} " + f"{hop.asn or '???':<8} " + f"{hop.loss_pct:6.1f} " + f"{hop.sent:5d} " + f"{hop.last_ms:7.1f} " + f"{hop.avg_ms:7.1f} " + f"{hop.best_ms:7.1f} " + f"{hop.worst_ms:7.1f} " + f"{hop.stdev_ms:7.1f}" + ) + + +HOST_RE = re.compile(r"^(?P<hostname>.+?) \((?P<ip>[^)]+)\)$") + +SUPPORTED_PROTOCOLS = { + IPProtocol.TCP, + IPProtocol.UDP, + IPProtocol.SCTP, + IPProtocol.ICMP, +} +PROTOCOLS_W_PORT = {IPProtocol.TCP, IPProtocol.UDP, IPProtocol.SCTP} + + +def get_mtr_command( + ip: str, + protocol: Optional[IPProtocol] = None, + port: Optional[int] = None, + report_cycles: int = 10, +) -> List[str]: + # https://manpages.ubuntu.com/manpages/focal/man8/mtr.8.html + # e.g. "mtr -r -c 2 -b -z -j -T -P 443 74.139.70.149" + args = ["mtr", "--report", "--show-ips", "--aslookup", "--json"] + if report_cycles is not None: + args.extend(["-c", str(int(report_cycles))]) + if port is not None: + if protocol is None: + protocol = IPProtocol.TCP + assert protocol in PROTOCOLS_W_PORT, "port only allowed for TCP/SCTP/UDP traces" + args.extend(["--port", str(int(port))]) + if protocol: + assert protocol in SUPPORTED_PROTOCOLS, f"unsupported protocol: {protocol}" + # default is ICMP (no args) + arg_map = { + IPProtocol.TCP: "--tcp", + IPProtocol.UDP: "--udp", + IPProtocol.SCTP: "--sctp", + } + if protocol in arg_map: + args.append(arg_map[protocol]) + args.append(ip) + return args + + +def get_mtr_version() -> str: + proc = subprocess.run( + ["mtr", "-v"], + capture_output=True, + text=True, + check=False, + ) + # e.g. mtr 0.95 + ver_str = proc.stdout.strip() + return ver_str.split(" ", 1)[1] + + +def run_mtr( + ip: str, + protocol: Optional[IPProtocol] = None, + port: Optional[int] = None, + report_cycles: int = 10, +) -> MTRReport: + args = get_mtr_command( + ip=ip, protocol=protocol, port=port, report_cycles=report_cycles + ) + proc = subprocess.run( + args, + capture_output=True, + text=True, + check=False, + ) + raw = proc.stdout.strip() + data = parse_raw_output(raw) + return MTRReport.model_validate(data) + + +def parse_raw_output(raw: str) -> Dict: + data = json.loads(raw)["report"] + data.update(data.pop("mtr")) + data["hops"] = data.pop("hubs") + return data + + +def load_example(): + s = open( + "/home/gstupp/projects/generalresearch/generalresearch/models/network/mtr_fatbeam.json", + "r", + ).read() + data = parse_raw_output(s) + return MTRReport.model_validate(data) diff --git a/generalresearch/models/network/nmap.py b/generalresearch/models/network/nmap.py index 8f1720b..fcb3d49 100644 --- a/generalresearch/models/network/nmap.py +++ b/generalresearch/models/network/nmap.py @@ -7,12 +7,7 @@ from typing import Dict, Any, Literal, List, Optional, Tuple, Set from pydantic import computed_field, BaseModel, Field from generalresearch.models.custom_types import AwareDatetimeISO, IPvAnyAddressStr - - -class NmapTraceProtocol(StrEnum): - TCP = "tcp" - UDP = "udp" - SCTP = "sctp" +from generalresearch.models.network.definitions import IPProtocol class PortState(StrEnum): @@ -47,21 +42,6 @@ class PortStateReason(StrEnum): TIME_EXCEEDED = "time-exceeded" -class NmapProtocol(StrEnum): - TCP = "tcp" - UDP = "udp" - SCTP = "sctp" - IP = "ip" - - def to_number(self) -> int: - return { - self.TCP: 6, - self.UDP: 17, - self.SCTP: 132, - self.IP: 4, - }[self] - - class NmapScanType(StrEnum): SYN = "syn" CONNECT = "connect" @@ -150,7 +130,7 @@ class NmapService(BaseModel): class NmapPort(BaseModel): port: int = Field() - protocol: NmapProtocol = Field() + protocol: IPProtocol = Field() # Closed ports will not have a NmapPort record state: PortState = Field() reason: Optional[PortStateReason] = Field(default=None) @@ -227,7 +207,7 @@ class NmapTrace(BaseModel): default=None, description="Destination port used for traceroute probes (may be absent depending on scan type).", ) - protocol: Optional[NmapTraceProtocol] = Field( + protocol: Optional[IPProtocol] = Field( default=None, description="Transport protocol used for the traceroute probes (tcp, udp, etc.).", ) @@ -269,7 +249,7 @@ class NmapScanInfo(BaseModel): """ type: NmapScanType = Field() - protocol: NmapProtocol = Field() + protocol: IPProtocol = Field() num_services: int = Field() services: str = Field() @@ -366,13 +346,13 @@ class NmapRun(BaseModel): @property def scan_info_tcp(self): return next( - filter(lambda x: x.protocol == NmapProtocol.TCP, self.scan_infos), None + filter(lambda x: x.protocol == IPProtocol.TCP, self.scan_infos), None ) @property def scan_info_udp(self): return next( - filter(lambda x: x.protocol == NmapProtocol.UDP, self.scan_infos), None + filter(lambda x: x.protocol == IPProtocol.UDP, self.scan_infos), None ) @property @@ -385,7 +365,7 @@ class NmapRun(BaseModel): return None return max(self.os_matches, key=lambda m: m.accuracy) - def filter_ports(self, protocol: NmapProtocol, state: PortState) -> List[NmapPort]: + def filter_ports(self, protocol: IPProtocol, state: PortState) -> List[NmapPort]: return [p for p in self.ports if p.protocol == protocol and p.state == state] @property @@ -395,7 +375,7 @@ class NmapRun(BaseModel): """ return [ p.port - for p in self.filter_ports(protocol=NmapProtocol.TCP, state=PortState.OPEN) + for p in self.filter_ports(protocol=IPProtocol.TCP, state=PortState.OPEN) ] @property @@ -405,15 +385,15 @@ class NmapRun(BaseModel): """ return [ p.port - for p in self.filter_ports(protocol=NmapProtocol.UDP, state=PortState.OPEN) + for p in self.filter_ports(protocol=IPProtocol.UDP, state=PortState.OPEN) ] @cached_property - def _port_index(self) -> Dict[Tuple[NmapProtocol, int], NmapPort]: + def _port_index(self) -> Dict[Tuple[IPProtocol, int], NmapPort]: return {(p.protocol, p.port): p for p in self.ports} def get_port_state( - self, port: int, protocol: NmapProtocol = NmapProtocol.TCP + self, port: int, protocol: IPProtocol = IPProtocol.TCP ) -> PortState: # Explicit (only if scanned and not closed) if (protocol, port) in self._port_index: diff --git a/generalresearch/models/network/xml_parser.py b/generalresearch/models/network/xml_parser.py index 02265a8..419e300 100644 --- a/generalresearch/models/network/xml_parser.py +++ b/generalresearch/models/network/xml_parser.py @@ -2,6 +2,7 @@ import xml.etree.cElementTree as ET from datetime import datetime, timezone from typing import List, Dict, Any, Tuple, Optional +from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.nmap import ( NmapHostname, NmapRun, @@ -12,7 +13,6 @@ from generalresearch.models.network.nmap import ( NmapScript, NmapPortStats, NmapScanType, - NmapProtocol, NmapHostState, NmapHostStatusReason, NmapHostScript, @@ -20,7 +20,6 @@ from generalresearch.models.network.nmap import ( NmapOSClass, NmapTrace, NmapTraceHop, - NmapTraceProtocol, NmapScanInfo, ) @@ -103,7 +102,7 @@ class NmapXmlParser: def _parse_scaninfo(cls, scaninfo_el: ET.Element) -> NmapScanInfo: data = dict() data["type"] = NmapScanType(scaninfo_el.attrib["type"]) - data["protocol"] = NmapProtocol(scaninfo_el.attrib["protocol"]) + data["protocol"] = IPProtocol(scaninfo_el.attrib["protocol"]) data["num_services"] = scaninfo_el.attrib["numservices"] data["services"] = scaninfo_el.attrib["services"] return NmapScanInfo.model_validate(data) @@ -226,7 +225,9 @@ class NmapXmlParser: osfamily=c.attrib.get("osfamily"), osgen=c.attrib.get("osgen"), accuracy=( - int(c.attrib["accuracy"]) if "accuracy" in c.attrib else None + int(c.attrib["accuracy"]) + if "accuracy" in c.attrib + else None ), cpe=cpes or None, ) @@ -250,7 +251,9 @@ class NmapXmlParser: <hostname name="108-171-53-1.aceips.com" type="PTR"/> </hostnames> """ - return [cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname")] + return [ + cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname") + ] @classmethod def _parse_hostname(cls, hostname_el: ET.Element) -> NmapHostname: @@ -399,7 +402,6 @@ class NmapXmlParser: return NmapTrace( port=int(port_attr) if port_attr is not None else None, - protocol=NmapTraceProtocol(proto_attr) if proto_attr is not None else None, + protocol=IPProtocol(proto_attr) if proto_attr is not None else None, hops=hops, ) - |
