diff options
| author | stuppie | 2026-03-13 13:31:22 -0600 |
|---|---|---|
| committer | stuppie | 2026-03-13 13:31:22 -0600 |
| commit | efe1c368b8b49a2c6b3bf2193a5b89eb5426eba3 (patch) | |
| tree | e00bc797640fc18f3484506d6a5b752cf1c8b7f4 | |
| parent | d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4 (diff) | |
| download | generalresearch-efe1c368b8b49a2c6b3bf2193a5b89eb5426eba3.tar.gz generalresearch-efe1c368b8b49a2c6b3bf2193a5b89eb5426eba3.zip | |
ToolRunCommand + options models to handle cmd line args for all network tools
17 files changed, 187 insertions, 62 deletions
diff --git a/generalresearch/models/network/mtr/command.py b/generalresearch/models/network/mtr/command.py index e3ab903..f8d2d49 100644 --- a/generalresearch/models/network/mtr/command.py +++ b/generalresearch/models/network/mtr/command.py @@ -4,6 +4,7 @@ from typing import List, Optional from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.parser import parse_mtr_output from generalresearch.models.network.mtr.result import MTRResult +from generalresearch.models.network.tool_run_command import MTRRunCommand SUPPORTED_PROTOCOLS = { IPProtocol.TCP, @@ -56,20 +57,14 @@ def get_mtr_version() -> str: return ver_str.split(" ", 1)[1] -def run_mtr( - ip: str, - protocol: Optional[IPProtocol] = None, - port: Optional[int] = None, - report_cycles: int = 10, -) -> MTRResult: - args = build_mtr_command( - ip=ip, protocol=protocol, port=port, report_cycles=report_cycles - ) +def run_mtr(config: MTRRunCommand) -> MTRResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( - args.split(" "), + args, capture_output=True, text=True, check=False, ) raw = proc.stdout.strip() - return parse_mtr_output(raw, protocol=protocol, port=port) + return parse_mtr_output(raw, protocol=config.options.protocol, port=config.options.port) diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py index 81de24f..a6fb82a 100644 --- a/generalresearch/models/network/mtr/execute.py +++ b/generalresearch/models/network/mtr/execute.py @@ -10,30 +10,33 @@ from generalresearch.models.network.mtr.command import ( build_mtr_command, ) from generalresearch.models.network.tool_run import MTRRun, ToolName, ToolClass, Status -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + MTRRunCommand, + MTRRunCommandOptions, +) from generalresearch.models.network.utils import get_source_ip def execute_mtr( ip: str, scan_group_id: Optional[UUIDStr] = None, - protocol: Optional[IPProtocol] = None, + protocol: Optional[IPProtocol] = IPProtocol.ICMP, port: Optional[int] = None, report_cycles: int = 10, ) -> MTRRun: + config = MTRRunCommand( + options=MTRRunCommandOptions( + ip=ip, + report_cycles=report_cycles, + protocol=protocol, + port=port, + ), + ) + started_at = datetime.now(tz=timezone.utc) tool_version = get_mtr_version() - result = run_mtr(ip, protocol=protocol, port=port, report_cycles=report_cycles) + result = run_mtr(config) finished_at = datetime.now(tz=timezone.utc) - raw_command = build_mtr_command(ip) - config = ToolRunCommand( - command="mtr", - options={ - "protocol": result.protocol, - "port": result.port, - "report_cycles": report_cycles, - }, - ) return MTRRun( tool_name=ToolName.MTR, @@ -43,7 +46,7 @@ def execute_mtr( ip=ip, started_at=started_at, finished_at=finished_at, - raw_command=raw_command, + raw_command=config.to_command_str(), scan_group_id=scan_group_id or uuid4().hex, config=config, parsed=result, diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py index fb0ca61..685eeca 100644 --- a/generalresearch/models/network/mtr/parser.py +++ b/generalresearch/models/network/mtr/parser.py @@ -5,10 +5,10 @@ from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.result import MTRResult -def parse_mtr_output(raw: str, port, protocol) -> MTRResult: +def parse_mtr_output(raw: str, port: int, protocol: IPProtocol) -> MTRResult: data = parse_mtr_raw_output(raw) data["port"] = port - data["protocol"] = protocol or IPProtocol.ICMP + data["protocol"] = protocol return MTRResult.model_validate(data) diff --git a/generalresearch/models/network/nmap/command.py b/generalresearch/models/network/nmap/command.py index dfa55de..47e0a87 100644 --- a/generalresearch/models/network/nmap/command.py +++ b/generalresearch/models/network/nmap/command.py @@ -3,18 +3,40 @@ from typing import Optional, List from generalresearch.models.network.nmap.parser import parse_nmap_xml from generalresearch.models.network.nmap.result import NmapResult +from generalresearch.models.network.tool_run_command import NmapRunCommand -def build_nmap_command(ip: str, top_ports: Optional[int] = 1000) -> List[str]: +def build_nmap_command( + ip: str, + no_ping: bool = True, + enable_advanced: bool = True, + timing: int = 4, + ports: Optional[str] = None, + top_ports: Optional[int] = None, +) -> str: # e.g. "nmap -Pn -T4 -A --top-ports 1000 -oX - scanme.nmap.org" # https://linux.die.net/man/1/nmap - args = ["nmap", "-Pn", "-T4", "-A", "--top-ports", str(int(top_ports)), "-oX", "-"] - args.append(ip) - return args + args = ["nmap"] + assert 0 <= timing <= 5 + args.append(f"-T{timing}") + if no_ping: + args.append("-Pn") + if enable_advanced: + args.append("-A") + if ports is not None: + assert top_ports is None + args.extend(["-p", ports]) + if top_ports is not None: + assert ports is None + args.extend(["--top-ports", str(top_ports)]) + args.extend(["-oX", "-", ip]) + return " ".join(args) -def run_nmap(ip: str, top_ports: Optional[int] = 1000) -> NmapResult: - args = build_nmap_command(ip=ip, top_ports=top_ports) + +def run_nmap(config: NmapRunCommand) -> NmapResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( args, capture_output=True, diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py index 68a9926..0334f50 100644 --- a/generalresearch/models/network/nmap/execute.py +++ b/generalresearch/models/network/nmap/execute.py @@ -4,15 +4,35 @@ from uuid import uuid4 from generalresearch.models.custom_types import UUIDStr from generalresearch.models.network.nmap.command import run_nmap from generalresearch.models.network.tool_run import NmapRun, ToolName, ToolClass, Status -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + NmapRunCommand, + NmapRunCommandOptions, +) def execute_nmap( - ip: str, top_ports: Optional[int] = 1000, scan_group_id: Optional[UUIDStr] = None + ip: str, + top_ports: Optional[int] = 1000, + ports: Optional[str] = None, + no_ping: bool = True, + enable_advanced: bool = True, + timing: int = 4, + scan_group_id: Optional[UUIDStr] = None, ): - result = run_nmap(ip=ip, top_ports=top_ports) + config = NmapRunCommand( + options=NmapRunCommandOptions( + top_ports=top_ports, + ports=ports, + no_ping=no_ping, + enable_advanced=enable_advanced, + timing=timing, + ip=ip, + ) + ) + result = run_nmap(config) assert result.exit_status == "success" assert result.target_ip == ip, f"{result.target_ip=}, {ip=}" + assert result.command_line == config.to_command_str() run = NmapRun( tool_name=ToolName.NMAP, @@ -24,7 +44,7 @@ def execute_nmap( finished_at=result.finished_at, raw_command=result.command_line, scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand(command="nmap", options={'top_ports': top_ports}), + config=config, parsed=result, ) - return run
\ No newline at end of file + return run diff --git a/generalresearch/models/network/rdns/command.py b/generalresearch/models/network/rdns/command.py index e9d5bfd..aa48f2a 100644 --- a/generalresearch/models/network/rdns/command.py +++ b/generalresearch/models/network/rdns/command.py @@ -2,10 +2,12 @@ import subprocess from generalresearch.models.network.rdns.parser import parse_rdns_output from generalresearch.models.network.rdns.result import RDNSResult +from generalresearch.models.network.tool_run_command import RDNSRunCommand -def run_rdns(ip: str) -> RDNSResult: - args = build_rdns_command(ip).split(" ") +def run_rdns(config: RDNSRunCommand) -> RDNSResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( args, capture_output=True, @@ -13,10 +15,10 @@ def run_rdns(ip: str) -> RDNSResult: check=False, ) raw = proc.stdout.strip() - return parse_rdns_output(ip, raw) + return parse_rdns_output(ip=config.options.ip, raw=raw) -def build_rdns_command(ip: str): +def build_rdns_command(ip: str) -> str: # e.g. dig +noall +answer -x 1.2.3.4 return " ".join(["dig", "+noall", "+answer", "-x", ip]) diff --git a/generalresearch/models/network/rdns/execute.py b/generalresearch/models/network/rdns/execute.py index 97b8bf8..03a5080 100644 --- a/generalresearch/models/network/rdns/execute.py +++ b/generalresearch/models/network/rdns/execute.py @@ -14,15 +14,18 @@ from generalresearch.models.network.tool_run import ( Status, RDNSRun, ) -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + RDNSRunCommand, + RDNSRunCommandOptions, +) def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): started_at = datetime.now(tz=timezone.utc) tool_version = get_dig_version() - result = run_rdns(ip=ip) + config = RDNSRunCommand(options=RDNSRunCommandOptions(ip=ip)) + result = run_rdns(config) finished_at = datetime.now(tz=timezone.utc) - raw_command = build_rdns_command(ip) run = RDNSRun( tool_name=ToolName.DIG, @@ -32,9 +35,9 @@ def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): ip=ip, started_at=started_at, finished_at=finished_at, - raw_command=raw_command, + raw_command=config.to_command_str(), scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand(command="dig", options={}), + config=config, parsed=result, ) diff --git a/generalresearch/models/network/rdns/parser.py b/generalresearch/models/network/rdns/parser.py index f12a6f4..231949e 100644 --- a/generalresearch/models/network/rdns/parser.py +++ b/generalresearch/models/network/rdns/parser.py @@ -2,12 +2,13 @@ import ipaddress import re from typing import List +from generalresearch.models.custom_types import IPvAnyAddressStr from generalresearch.models.network.rdns.result import RDNSResult PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.") -def parse_rdns_output(ip, raw): +def parse_rdns_output(ip: IPvAnyAddressStr, raw:str): hostnames: List[str] = [] for line in raw.splitlines(): diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py index 36e6950..114d4b6 100644 --- a/generalresearch/models/network/tool_run.py +++ b/generalresearch/models/network/tool_run.py @@ -12,7 +12,12 @@ from generalresearch.models.custom_types import ( from generalresearch.models.network.mtr.result import MTRResult from generalresearch.models.network.nmap.result import NmapResult from generalresearch.models.network.rdns.result import RDNSResult -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + ToolRunCommand, + NmapRunCommand, + RDNSRunCommand, + MTRRunCommand, +) class ToolClass(StrEnum): @@ -68,6 +73,7 @@ class ToolRun(BaseModel): class NmapRun(ToolRun): tool_class: Literal[ToolClass.PORT_SCAN] = Field(default=ToolClass.PORT_SCAN) tool_name: Literal[ToolName.NMAP] = Field(default=ToolName.NMAP) + config: NmapRunCommand = Field() parsed: NmapResult = Field() @@ -81,6 +87,7 @@ class NmapRun(ToolRun): class RDNSRun(ToolRun): tool_class: Literal[ToolClass.RDNS] = Field(default=ToolClass.RDNS) tool_name: Literal[ToolName.DIG] = Field(default=ToolName.DIG) + config: RDNSRunCommand = Field() parsed: RDNSResult = Field() @@ -94,6 +101,7 @@ class RDNSRun(ToolRun): class MTRRun(ToolRun): tool_class: Literal[ToolClass.TRACEROUTE] = Field(default=ToolClass.TRACEROUTE) tool_name: Literal[ToolName.MTR] = Field(default=ToolName.MTR) + config: MTRRunCommand = Field() facility_id: int = Field(default=1) source_ip: IPvAnyAddressStr = Field() diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py index 5abe670..68d2070 100644 --- a/generalresearch/models/network/tool_run_command.py +++ b/generalresearch/models/network/tool_run_command.py @@ -1,9 +1,64 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Literal from pydantic import BaseModel, Field +from generalresearch.models.custom_types import IPvAnyAddressStr +from generalresearch.models.network.definitions import IPProtocol + class ToolRunCommand(BaseModel): - # todo: expand with arguments specific for each tool command: str = Field() options: Dict[str, Optional[str | int]] = Field(default_factory=dict) + + +class NmapRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr + top_ports: Optional[int] = Field(default=1000) + ports: Optional[str] = Field(default=None) + no_ping: bool = Field(default=True) + enable_advanced: bool = Field(default=True) + timing: int = Field(default=4) + + +class NmapRunCommand(ToolRunCommand): + command: Literal["nmap"] = Field(default="nmap") + options: NmapRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.nmap.command import build_nmap_command + + options = self.options + return build_nmap_command(**options.model_dump()) + + +class RDNSRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr + + +class RDNSRunCommand(ToolRunCommand): + command: Literal["dig"] = Field(default="dig") + options: RDNSRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.rdns.command import build_rdns_command + + options = self.options + return build_rdns_command(**options.model_dump()) + + +class MTRRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr = Field() + protocol: IPProtocol = Field(default=IPProtocol.ICMP) + port: Optional[int] = Field(default=None) + report_cycles: int = Field(default=10) + + +class MTRRunCommand(ToolRunCommand): + command: Literal["mtr"] = Field(default="mtr") + options: MTRRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.mtr.command import build_mtr_command + + options = self.options + return build_mtr_command(**options.model_dump()) diff --git a/test_utils/managers/network/conftest.py b/test_utils/managers/network/conftest.py index f6a4078..979dd63 100644 --- a/test_utils/managers/network/conftest.py +++ b/test_utils/managers/network/conftest.py @@ -7,13 +7,18 @@ import pytest from generalresearch.managers.network.label import IPLabelManager from generalresearch.managers.network.tool_run import ToolRunManager from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.mtr.command import build_mtr_command from generalresearch.models.network.mtr.parser import parse_mtr_output from generalresearch.models.network.nmap.parser import parse_nmap_xml -from generalresearch.models.network.rdns.command import build_rdns_command from generalresearch.models.network.rdns.parser import parse_rdns_output from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + MTRRunCommand, + MTRRunCommandOptions, + RDNSRunCommand, + NmapRunCommand, + NmapRunCommandOptions, + RDNSRunCommandOptions, +) @pytest.fixture(scope="session") @@ -51,15 +56,21 @@ def nmap_result(nmap_raw_output): @pytest.fixture(scope="session") def nmap_run(nmap_result, scan_group_id): r = nmap_result + config = NmapRunCommand( + command="nmap", + options=NmapRunCommandOptions( + ip=r.target_ip, ports="22-1000,11000,1100,3389,61232", top_ports=None + ), + ) return NmapRun( tool_version=r.version, status=Status.SUCCESS, ip=r.target_ip, started_at=r.started_at, finished_at=r.finished_at, - raw_command=r.command_line, + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="nmap"), + config=config, parsed=r, ) @@ -79,15 +90,16 @@ def rdns_run(rdns_result, scan_group_id): r = rdns_result ip = "45.33.32.156" utc_now = datetime.now(tz=timezone.utc) + config = RDNSRunCommand(command="dig", options=RDNSRunCommandOptions(ip=ip)) return RDNSRun( tool_version="1.2.3", status=Status.SUCCESS, ip=ip, started_at=utc_now, finished_at=utc_now + timedelta(seconds=1), - raw_command=build_rdns_command(ip=ip), + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="dig"), + config=config, parsed=r, ) @@ -109,6 +121,12 @@ def mtr_result(mtr_raw_output): def mtr_run(mtr_result, scan_group_id): r = mtr_result utc_now = datetime.now(tz=timezone.utc) + config = MTRRunCommand( + command="mtr", + options=MTRRunCommandOptions( + ip=r.destination, protocol=IPProtocol.TCP, port=443 + ), + ) return MTRRun( tool_version="1.2.3", @@ -116,11 +134,9 @@ def mtr_run(mtr_result, scan_group_id): ip=r.destination, started_at=utc_now, finished_at=utc_now + timedelta(seconds=1), - raw_command=build_mtr_command( - ip=r.destination, protocol=IPProtocol.TCP, port=443 - ), + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="mtr"), + config=config, parsed=r, facility_id=1, source_ip="1.2.3.4", diff --git a/tests/managers/network/label.py b/tests/managers/network/test_label.py index 5b9a790..5b9a790 100644 --- a/tests/managers/network/label.py +++ b/tests/managers/network/test_label.py diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/test_tool_run.py index a815809..a815809 100644 --- a/tests/managers/network/tool_run.py +++ b/tests/managers/network/test_tool_run.py diff --git a/tests/models/network/mtr.py b/tests/models/network/test_mtr.py index 2965300..2965300 100644 --- a/tests/models/network/mtr.py +++ b/tests/models/network/test_mtr.py diff --git a/tests/models/network/nmap.py b/tests/models/network/test_nmap.py index f034bf0..0be98d4 100644 --- a/tests/models/network/nmap.py +++ b/tests/models/network/test_nmap.py @@ -17,7 +17,7 @@ def resolve(host): def test_execute_nmap_scanme(toolrun_manager): ip = resolve("scanme.nmap.org") - run = execute_nmap(ip=ip, top_ports=20) + run = execute_nmap(ip=ip, top_ports=None, ports="20-30", enable_advanced=False) assert run.tool_name == ToolName.NMAP assert run.tool_class == ToolClass.PORT_SCAN assert run.ip == ip diff --git a/tests/models/network/nmap_parser.py b/tests/models/network/test_nmap_parser.py index 96d7b37..96d7b37 100644 --- a/tests/models/network/nmap_parser.py +++ b/tests/models/network/test_nmap_parser.py diff --git a/tests/models/network/rdns.py b/tests/models/network/test_rdns.py index e56c494..e56c494 100644 --- a/tests/models/network/rdns.py +++ b/tests/models/network/test_rdns.py |
