aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorstuppie2026-03-10 14:35:50 -0600
committerstuppie2026-03-10 14:35:50 -0600
commite54b19afcdf91a9574064adb3f6e64adc03121c4 (patch)
tree728bbe4b4bde860b441850958574dffc896cd2a7
parent5b2facfababef77fd39ef814b7bb67e5b3ea6ee3 (diff)
downloadgeneralresearch-e54b19afcdf91a9574064adb3f6e64adc03121c4.tar.gz
generalresearch-e54b19afcdf91a9574064adb3f6e64adc03121c4.zip
add mtr / traceroute models, runner, parser
-rw-r--r--generalresearch/models/network/definitions.py69
-rw-r--r--generalresearch/models/network/mtr.py205
-rw-r--r--generalresearch/models/network/nmap.py42
-rw-r--r--generalresearch/models/network/xml_parser.py16
4 files changed, 294 insertions, 38 deletions
diff --git a/generalresearch/models/network/definitions.py b/generalresearch/models/network/definitions.py
new file mode 100644
index 0000000..4fb44f4
--- /dev/null
+++ b/generalresearch/models/network/definitions.py
@@ -0,0 +1,69 @@
+from enum import StrEnum
+from ipaddress import ip_address, ip_network
+from typing import Optional
+
+CGNAT_NET = ip_network("100.64.0.0/10")
+
+
+class IPProtocol(StrEnum):
+ TCP = "tcp"
+ UDP = "udp"
+ SCTP = "sctp"
+ IP = "ip"
+ ICMP = "icmp"
+ ICMPv6 = "icmpv6"
+
+ def to_number(self) -> int:
+ # https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
+ return {
+ self.TCP: 6,
+ self.UDP: 17,
+ self.SCTP: 132,
+ self.IP: 4,
+ self.ICMP: 1,
+ self.ICMPv6: 58,
+ }[self]
+
+
+class IPKind(StrEnum):
+ PUBLIC = "public"
+ PRIVATE = "private"
+ CGNAT = "carrier_nat"
+ LOOPBACK = "loopback"
+ LINK_LOCAL = "link_local"
+ MULTICAST = "multicast"
+ RESERVED = "reserved"
+ UNSPECIFIED = "unspecified"
+
+
+def get_ip_kind(ip: Optional[str]) -> Optional[IPKind]:
+ if not ip:
+ return None
+
+ ip_obj = ip_address(ip)
+
+ if ip_obj in CGNAT_NET:
+ return IPKind.CGNAT
+
+ if ip_obj.is_loopback:
+ return IPKind.LOOPBACK
+
+ if ip_obj.is_link_local:
+ return IPKind.LINK_LOCAL
+
+ if ip_obj.is_multicast:
+ return IPKind.MULTICAST
+
+ if ip_obj.is_unspecified:
+ return IPKind.UNSPECIFIED
+
+ if ip_obj.is_private:
+ return IPKind.PRIVATE
+
+ if ip_obj.is_reserved:
+ return IPKind.RESERVED
+
+ if ip_obj.is_global:
+ return IPKind.PUBLIC
+
+ return None
diff --git a/generalresearch/models/network/mtr.py b/generalresearch/models/network/mtr.py
new file mode 100644
index 0000000..98e7c16
--- /dev/null
+++ b/generalresearch/models/network/mtr.py
@@ -0,0 +1,205 @@
+import json
+import re
+import subprocess
+from ipaddress import ip_address
+from typing import List, Optional, Dict
+
+from pydantic import Field, field_validator, BaseModel, ConfigDict, model_validator
+
+from generalresearch.models.network.definitions import IPProtocol, get_ip_kind, IPKind
+
+
+class MTRHop(BaseModel):
+ model_config = ConfigDict(populate_by_name=True)
+
+ hop: int = Field(alias="count")
+ host: str
+ asn: Optional[str] = Field(default=None, alias="ASN")
+
+ loss_pct: float = Field(alias="Loss%")
+ sent: int = Field(alias="Snt")
+
+ last_ms: float = Field(alias="Last")
+ avg_ms: float = Field(alias="Avg")
+ best_ms: float = Field(alias="Best")
+ worst_ms: float = Field(alias="Wrst")
+ stdev_ms: float = Field(alias="StDev")
+
+ hostname: Optional[str] = None
+ ip: Optional[str] = None
+
+ @field_validator("asn")
+ @classmethod
+ def normalize_asn(cls, v):
+ if v == "AS???":
+ return None
+ return v
+
+ @model_validator(mode="after")
+ def parse_host(self):
+ host = self.host.strip()
+
+ # hostname (ip)
+ m = HOST_RE.match(host)
+ if m:
+ self.hostname = m.group("hostname")
+ self.ip = m.group("ip")
+ return self
+
+ # ip only
+ try:
+ ip_address(host)
+ self.ip = host
+ self.hostname = None
+ return self
+ except ValueError:
+ pass
+
+ # hostname only
+ self.hostname = host
+ self.ip = None
+ return self
+
+ @property
+ def ip_kind(self) -> Optional[IPKind]:
+ return get_ip_kind(self.ip)
+
+ @property
+ def icmp_rate_limited(self):
+ if self.avg_ms == 0:
+ return False
+ return self.stdev_ms > self.avg_ms or self.worst_ms > self.best_ms * 10
+
+
+class MTRReport(BaseModel):
+ model_config = ConfigDict(populate_by_name=True)
+
+ source: str = Field(description="Hostname of the system running mtr.", alias="src")
+ destination: str = Field(
+ description="Destination hostname or IP being traced.", alias="dst"
+ )
+ tos: int = Field(description="IP Type-of-Service (TOS) value used for probes.")
+ tests: int = Field(description="Number of probes sent per hop.")
+ psize: int = Field(description="Probe packet size in bytes.")
+ bitpattern: str = Field(description="Payload byte pattern used in probes (hex).")
+
+ hops: List[MTRHop] = Field()
+
+ def print_report(self) -> None:
+ print(f"MTR Report → {self.destination}\n")
+ host_max_len = max(len(h.host) for h in self.hops)
+
+ header = (
+ f"{'Hop':>3} "
+ f"{'Host':<{host_max_len}} "
+ f"{'Kind':<10} "
+ f"{'ASN':<8} "
+ f"{'Loss%':>6} {'Sent':>5} "
+ f"{'Last':>7} {'Avg':>7} {'Best':>7} {'Worst':>7} {'StDev':>7}"
+ )
+ print(header)
+ print("-" * len(header))
+
+ for hop in self.hops:
+ print(
+ f"{hop.hop:>3} "
+ f"{hop.host:<{host_max_len}} "
+ f"{hop.ip_kind or '???':<10} "
+ f"{hop.asn or '???':<8} "
+ f"{hop.loss_pct:6.1f} "
+ f"{hop.sent:5d} "
+ f"{hop.last_ms:7.1f} "
+ f"{hop.avg_ms:7.1f} "
+ f"{hop.best_ms:7.1f} "
+ f"{hop.worst_ms:7.1f} "
+ f"{hop.stdev_ms:7.1f}"
+ )
+
+
+HOST_RE = re.compile(r"^(?P<hostname>.+?) \((?P<ip>[^)]+)\)$")
+
+SUPPORTED_PROTOCOLS = {
+ IPProtocol.TCP,
+ IPProtocol.UDP,
+ IPProtocol.SCTP,
+ IPProtocol.ICMP,
+}
+PROTOCOLS_W_PORT = {IPProtocol.TCP, IPProtocol.UDP, IPProtocol.SCTP}
+
+
+def get_mtr_command(
+ ip: str,
+ protocol: Optional[IPProtocol] = None,
+ port: Optional[int] = None,
+ report_cycles: int = 10,
+) -> List[str]:
+ # https://manpages.ubuntu.com/manpages/focal/man8/mtr.8.html
+ # e.g. "mtr -r -c 2 -b -z -j -T -P 443 74.139.70.149"
+ args = ["mtr", "--report", "--show-ips", "--aslookup", "--json"]
+ if report_cycles is not None:
+ args.extend(["-c", str(int(report_cycles))])
+ if port is not None:
+ if protocol is None:
+ protocol = IPProtocol.TCP
+ assert protocol in PROTOCOLS_W_PORT, "port only allowed for TCP/SCTP/UDP traces"
+ args.extend(["--port", str(int(port))])
+ if protocol:
+ assert protocol in SUPPORTED_PROTOCOLS, f"unsupported protocol: {protocol}"
+ # default is ICMP (no args)
+ arg_map = {
+ IPProtocol.TCP: "--tcp",
+ IPProtocol.UDP: "--udp",
+ IPProtocol.SCTP: "--sctp",
+ }
+ if protocol in arg_map:
+ args.append(arg_map[protocol])
+ args.append(ip)
+ return args
+
+
+def get_mtr_version() -> str:
+ proc = subprocess.run(
+ ["mtr", "-v"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ # e.g. mtr 0.95
+ ver_str = proc.stdout.strip()
+ return ver_str.split(" ", 1)[1]
+
+
+def run_mtr(
+ ip: str,
+ protocol: Optional[IPProtocol] = None,
+ port: Optional[int] = None,
+ report_cycles: int = 10,
+) -> MTRReport:
+ args = get_mtr_command(
+ ip=ip, protocol=protocol, port=port, report_cycles=report_cycles
+ )
+ proc = subprocess.run(
+ args,
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ raw = proc.stdout.strip()
+ data = parse_raw_output(raw)
+ return MTRReport.model_validate(data)
+
+
+def parse_raw_output(raw: str) -> Dict:
+ data = json.loads(raw)["report"]
+ data.update(data.pop("mtr"))
+ data["hops"] = data.pop("hubs")
+ return data
+
+
+def load_example():
+ s = open(
+ "/home/gstupp/projects/generalresearch/generalresearch/models/network/mtr_fatbeam.json",
+ "r",
+ ).read()
+ data = parse_raw_output(s)
+ return MTRReport.model_validate(data)
diff --git a/generalresearch/models/network/nmap.py b/generalresearch/models/network/nmap.py
index 8f1720b..fcb3d49 100644
--- a/generalresearch/models/network/nmap.py
+++ b/generalresearch/models/network/nmap.py
@@ -7,12 +7,7 @@ from typing import Dict, Any, Literal, List, Optional, Tuple, Set
from pydantic import computed_field, BaseModel, Field
from generalresearch.models.custom_types import AwareDatetimeISO, IPvAnyAddressStr
-
-
-class NmapTraceProtocol(StrEnum):
- TCP = "tcp"
- UDP = "udp"
- SCTP = "sctp"
+from generalresearch.models.network.definitions import IPProtocol
class PortState(StrEnum):
@@ -47,21 +42,6 @@ class PortStateReason(StrEnum):
TIME_EXCEEDED = "time-exceeded"
-class NmapProtocol(StrEnum):
- TCP = "tcp"
- UDP = "udp"
- SCTP = "sctp"
- IP = "ip"
-
- def to_number(self) -> int:
- return {
- self.TCP: 6,
- self.UDP: 17,
- self.SCTP: 132,
- self.IP: 4,
- }[self]
-
-
class NmapScanType(StrEnum):
SYN = "syn"
CONNECT = "connect"
@@ -150,7 +130,7 @@ class NmapService(BaseModel):
class NmapPort(BaseModel):
port: int = Field()
- protocol: NmapProtocol = Field()
+ protocol: IPProtocol = Field()
# Closed ports will not have a NmapPort record
state: PortState = Field()
reason: Optional[PortStateReason] = Field(default=None)
@@ -227,7 +207,7 @@ class NmapTrace(BaseModel):
default=None,
description="Destination port used for traceroute probes (may be absent depending on scan type).",
)
- protocol: Optional[NmapTraceProtocol] = Field(
+ protocol: Optional[IPProtocol] = Field(
default=None,
description="Transport protocol used for the traceroute probes (tcp, udp, etc.).",
)
@@ -269,7 +249,7 @@ class NmapScanInfo(BaseModel):
"""
type: NmapScanType = Field()
- protocol: NmapProtocol = Field()
+ protocol: IPProtocol = Field()
num_services: int = Field()
services: str = Field()
@@ -366,13 +346,13 @@ class NmapRun(BaseModel):
@property
def scan_info_tcp(self):
return next(
- filter(lambda x: x.protocol == NmapProtocol.TCP, self.scan_infos), None
+ filter(lambda x: x.protocol == IPProtocol.TCP, self.scan_infos), None
)
@property
def scan_info_udp(self):
return next(
- filter(lambda x: x.protocol == NmapProtocol.UDP, self.scan_infos), None
+ filter(lambda x: x.protocol == IPProtocol.UDP, self.scan_infos), None
)
@property
@@ -385,7 +365,7 @@ class NmapRun(BaseModel):
return None
return max(self.os_matches, key=lambda m: m.accuracy)
- def filter_ports(self, protocol: NmapProtocol, state: PortState) -> List[NmapPort]:
+ def filter_ports(self, protocol: IPProtocol, state: PortState) -> List[NmapPort]:
return [p for p in self.ports if p.protocol == protocol and p.state == state]
@property
@@ -395,7 +375,7 @@ class NmapRun(BaseModel):
"""
return [
p.port
- for p in self.filter_ports(protocol=NmapProtocol.TCP, state=PortState.OPEN)
+ for p in self.filter_ports(protocol=IPProtocol.TCP, state=PortState.OPEN)
]
@property
@@ -405,15 +385,15 @@ class NmapRun(BaseModel):
"""
return [
p.port
- for p in self.filter_ports(protocol=NmapProtocol.UDP, state=PortState.OPEN)
+ for p in self.filter_ports(protocol=IPProtocol.UDP, state=PortState.OPEN)
]
@cached_property
- def _port_index(self) -> Dict[Tuple[NmapProtocol, int], NmapPort]:
+ def _port_index(self) -> Dict[Tuple[IPProtocol, int], NmapPort]:
return {(p.protocol, p.port): p for p in self.ports}
def get_port_state(
- self, port: int, protocol: NmapProtocol = NmapProtocol.TCP
+ self, port: int, protocol: IPProtocol = IPProtocol.TCP
) -> PortState:
# Explicit (only if scanned and not closed)
if (protocol, port) in self._port_index:
diff --git a/generalresearch/models/network/xml_parser.py b/generalresearch/models/network/xml_parser.py
index 02265a8..419e300 100644
--- a/generalresearch/models/network/xml_parser.py
+++ b/generalresearch/models/network/xml_parser.py
@@ -2,6 +2,7 @@ import xml.etree.cElementTree as ET
from datetime import datetime, timezone
from typing import List, Dict, Any, Tuple, Optional
+from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.nmap import (
NmapHostname,
NmapRun,
@@ -12,7 +13,6 @@ from generalresearch.models.network.nmap import (
NmapScript,
NmapPortStats,
NmapScanType,
- NmapProtocol,
NmapHostState,
NmapHostStatusReason,
NmapHostScript,
@@ -20,7 +20,6 @@ from generalresearch.models.network.nmap import (
NmapOSClass,
NmapTrace,
NmapTraceHop,
- NmapTraceProtocol,
NmapScanInfo,
)
@@ -103,7 +102,7 @@ class NmapXmlParser:
def _parse_scaninfo(cls, scaninfo_el: ET.Element) -> NmapScanInfo:
data = dict()
data["type"] = NmapScanType(scaninfo_el.attrib["type"])
- data["protocol"] = NmapProtocol(scaninfo_el.attrib["protocol"])
+ data["protocol"] = IPProtocol(scaninfo_el.attrib["protocol"])
data["num_services"] = scaninfo_el.attrib["numservices"]
data["services"] = scaninfo_el.attrib["services"]
return NmapScanInfo.model_validate(data)
@@ -226,7 +225,9 @@ class NmapXmlParser:
osfamily=c.attrib.get("osfamily"),
osgen=c.attrib.get("osgen"),
accuracy=(
- int(c.attrib["accuracy"]) if "accuracy" in c.attrib else None
+ int(c.attrib["accuracy"])
+ if "accuracy" in c.attrib
+ else None
),
cpe=cpes or None,
)
@@ -250,7 +251,9 @@ class NmapXmlParser:
<hostname name="108-171-53-1.aceips.com" type="PTR"/>
</hostnames>
"""
- return [cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname")]
+ return [
+ cls._parse_hostname(hname) for hname in hostnames_el.findall("hostname")
+ ]
@classmethod
def _parse_hostname(cls, hostname_el: ET.Element) -> NmapHostname:
@@ -399,7 +402,6 @@ class NmapXmlParser:
return NmapTrace(
port=int(port_attr) if port_attr is not None else None,
- protocol=NmapTraceProtocol(proto_attr) if proto_attr is not None else None,
+ protocol=IPProtocol(proto_attr) if proto_attr is not None else None,
hops=hops,
)
-