aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--generalresearch/models/network/mtr/command.py17
-rw-r--r--generalresearch/models/network/mtr/execute.py29
-rw-r--r--generalresearch/models/network/mtr/parser.py4
-rw-r--r--generalresearch/models/network/nmap/command.py34
-rw-r--r--generalresearch/models/network/nmap/execute.py30
-rw-r--r--generalresearch/models/network/rdns/command.py10
-rw-r--r--generalresearch/models/network/rdns/execute.py13
-rw-r--r--generalresearch/models/network/rdns/parser.py3
-rw-r--r--generalresearch/models/network/tool_run.py10
-rw-r--r--generalresearch/models/network/tool_run_command.py59
-rw-r--r--test_utils/managers/network/conftest.py38
-rw-r--r--tests/managers/network/test_label.py (renamed from tests/managers/network/label.py)0
-rw-r--r--tests/managers/network/test_tool_run.py (renamed from tests/managers/network/tool_run.py)0
-rw-r--r--tests/models/network/test_mtr.py (renamed from tests/models/network/mtr.py)0
-rw-r--r--tests/models/network/test_nmap.py (renamed from tests/models/network/nmap.py)2
-rw-r--r--tests/models/network/test_nmap_parser.py (renamed from tests/models/network/nmap_parser.py)0
-rw-r--r--tests/models/network/test_rdns.py (renamed from tests/models/network/rdns.py)0
17 files changed, 187 insertions, 62 deletions
diff --git a/generalresearch/models/network/mtr/command.py b/generalresearch/models/network/mtr/command.py
index e3ab903..f8d2d49 100644
--- a/generalresearch/models/network/mtr/command.py
+++ b/generalresearch/models/network/mtr/command.py
@@ -4,6 +4,7 @@ from typing import List, Optional
from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.mtr.parser import parse_mtr_output
from generalresearch.models.network.mtr.result import MTRResult
+from generalresearch.models.network.tool_run_command import MTRRunCommand
SUPPORTED_PROTOCOLS = {
IPProtocol.TCP,
@@ -56,20 +57,14 @@ def get_mtr_version() -> str:
return ver_str.split(" ", 1)[1]
-def run_mtr(
- ip: str,
- protocol: Optional[IPProtocol] = None,
- port: Optional[int] = None,
- report_cycles: int = 10,
-) -> MTRResult:
- args = build_mtr_command(
- ip=ip, protocol=protocol, port=port, report_cycles=report_cycles
- )
+def run_mtr(config: MTRRunCommand) -> MTRResult:
+ cmd = config.to_command_str()
+ args = cmd.split(" ")
proc = subprocess.run(
- args.split(" "),
+ args,
capture_output=True,
text=True,
check=False,
)
raw = proc.stdout.strip()
- return parse_mtr_output(raw, protocol=protocol, port=port)
+ return parse_mtr_output(raw, protocol=config.options.protocol, port=config.options.port)
diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py
index 81de24f..a6fb82a 100644
--- a/generalresearch/models/network/mtr/execute.py
+++ b/generalresearch/models/network/mtr/execute.py
@@ -10,30 +10,33 @@ from generalresearch.models.network.mtr.command import (
build_mtr_command,
)
from generalresearch.models.network.tool_run import MTRRun, ToolName, ToolClass, Status
-from generalresearch.models.network.tool_run_command import ToolRunCommand
+from generalresearch.models.network.tool_run_command import (
+ MTRRunCommand,
+ MTRRunCommandOptions,
+)
from generalresearch.models.network.utils import get_source_ip
def execute_mtr(
ip: str,
scan_group_id: Optional[UUIDStr] = None,
- protocol: Optional[IPProtocol] = None,
+ protocol: Optional[IPProtocol] = IPProtocol.ICMP,
port: Optional[int] = None,
report_cycles: int = 10,
) -> MTRRun:
+ config = MTRRunCommand(
+ options=MTRRunCommandOptions(
+ ip=ip,
+ report_cycles=report_cycles,
+ protocol=protocol,
+ port=port,
+ ),
+ )
+
started_at = datetime.now(tz=timezone.utc)
tool_version = get_mtr_version()
- result = run_mtr(ip, protocol=protocol, port=port, report_cycles=report_cycles)
+ result = run_mtr(config)
finished_at = datetime.now(tz=timezone.utc)
- raw_command = build_mtr_command(ip)
- config = ToolRunCommand(
- command="mtr",
- options={
- "protocol": result.protocol,
- "port": result.port,
- "report_cycles": report_cycles,
- },
- )
return MTRRun(
tool_name=ToolName.MTR,
@@ -43,7 +46,7 @@ def execute_mtr(
ip=ip,
started_at=started_at,
finished_at=finished_at,
- raw_command=raw_command,
+ raw_command=config.to_command_str(),
scan_group_id=scan_group_id or uuid4().hex,
config=config,
parsed=result,
diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py
index fb0ca61..685eeca 100644
--- a/generalresearch/models/network/mtr/parser.py
+++ b/generalresearch/models/network/mtr/parser.py
@@ -5,10 +5,10 @@ from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.mtr.result import MTRResult
-def parse_mtr_output(raw: str, port, protocol) -> MTRResult:
+def parse_mtr_output(raw: str, port: int, protocol: IPProtocol) -> MTRResult:
data = parse_mtr_raw_output(raw)
data["port"] = port
- data["protocol"] = protocol or IPProtocol.ICMP
+ data["protocol"] = protocol
return MTRResult.model_validate(data)
diff --git a/generalresearch/models/network/nmap/command.py b/generalresearch/models/network/nmap/command.py
index dfa55de..47e0a87 100644
--- a/generalresearch/models/network/nmap/command.py
+++ b/generalresearch/models/network/nmap/command.py
@@ -3,18 +3,40 @@ from typing import Optional, List
from generalresearch.models.network.nmap.parser import parse_nmap_xml
from generalresearch.models.network.nmap.result import NmapResult
+from generalresearch.models.network.tool_run_command import NmapRunCommand
-def build_nmap_command(ip: str, top_ports: Optional[int] = 1000) -> List[str]:
+def build_nmap_command(
+ ip: str,
+ no_ping: bool = True,
+ enable_advanced: bool = True,
+ timing: int = 4,
+ ports: Optional[str] = None,
+ top_ports: Optional[int] = None,
+) -> str:
# e.g. "nmap -Pn -T4 -A --top-ports 1000 -oX - scanme.nmap.org"
# https://linux.die.net/man/1/nmap
- args = ["nmap", "-Pn", "-T4", "-A", "--top-ports", str(int(top_ports)), "-oX", "-"]
- args.append(ip)
- return args
+ args = ["nmap"]
+ assert 0 <= timing <= 5
+ args.append(f"-T{timing}")
+ if no_ping:
+ args.append("-Pn")
+ if enable_advanced:
+ args.append("-A")
+ if ports is not None:
+ assert top_ports is None
+ args.extend(["-p", ports])
+ if top_ports is not None:
+ assert ports is None
+ args.extend(["--top-ports", str(top_ports)])
+ args.extend(["-oX", "-", ip])
+ return " ".join(args)
-def run_nmap(ip: str, top_ports: Optional[int] = 1000) -> NmapResult:
- args = build_nmap_command(ip=ip, top_ports=top_ports)
+
+def run_nmap(config: NmapRunCommand) -> NmapResult:
+ cmd = config.to_command_str()
+ args = cmd.split(" ")
proc = subprocess.run(
args,
capture_output=True,
diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py
index 68a9926..0334f50 100644
--- a/generalresearch/models/network/nmap/execute.py
+++ b/generalresearch/models/network/nmap/execute.py
@@ -4,15 +4,35 @@ from uuid import uuid4
from generalresearch.models.custom_types import UUIDStr
from generalresearch.models.network.nmap.command import run_nmap
from generalresearch.models.network.tool_run import NmapRun, ToolName, ToolClass, Status
-from generalresearch.models.network.tool_run_command import ToolRunCommand
+from generalresearch.models.network.tool_run_command import (
+ NmapRunCommand,
+ NmapRunCommandOptions,
+)
def execute_nmap(
- ip: str, top_ports: Optional[int] = 1000, scan_group_id: Optional[UUIDStr] = None
+ ip: str,
+ top_ports: Optional[int] = 1000,
+ ports: Optional[str] = None,
+ no_ping: bool = True,
+ enable_advanced: bool = True,
+ timing: int = 4,
+ scan_group_id: Optional[UUIDStr] = None,
):
- result = run_nmap(ip=ip, top_ports=top_ports)
+ config = NmapRunCommand(
+ options=NmapRunCommandOptions(
+ top_ports=top_ports,
+ ports=ports,
+ no_ping=no_ping,
+ enable_advanced=enable_advanced,
+ timing=timing,
+ ip=ip,
+ )
+ )
+ result = run_nmap(config)
assert result.exit_status == "success"
assert result.target_ip == ip, f"{result.target_ip=}, {ip=}"
+ assert result.command_line == config.to_command_str()
run = NmapRun(
tool_name=ToolName.NMAP,
@@ -24,7 +44,7 @@ def execute_nmap(
finished_at=result.finished_at,
raw_command=result.command_line,
scan_group_id=scan_group_id or uuid4().hex,
- config=ToolRunCommand(command="nmap", options={'top_ports': top_ports}),
+ config=config,
parsed=result,
)
- return run \ No newline at end of file
+ return run
diff --git a/generalresearch/models/network/rdns/command.py b/generalresearch/models/network/rdns/command.py
index e9d5bfd..aa48f2a 100644
--- a/generalresearch/models/network/rdns/command.py
+++ b/generalresearch/models/network/rdns/command.py
@@ -2,10 +2,12 @@ import subprocess
from generalresearch.models.network.rdns.parser import parse_rdns_output
from generalresearch.models.network.rdns.result import RDNSResult
+from generalresearch.models.network.tool_run_command import RDNSRunCommand
-def run_rdns(ip: str) -> RDNSResult:
- args = build_rdns_command(ip).split(" ")
+def run_rdns(config: RDNSRunCommand) -> RDNSResult:
+ cmd = config.to_command_str()
+ args = cmd.split(" ")
proc = subprocess.run(
args,
capture_output=True,
@@ -13,10 +15,10 @@ def run_rdns(ip: str) -> RDNSResult:
check=False,
)
raw = proc.stdout.strip()
- return parse_rdns_output(ip, raw)
+ return parse_rdns_output(ip=config.options.ip, raw=raw)
-def build_rdns_command(ip: str):
+def build_rdns_command(ip: str) -> str:
# e.g. dig +noall +answer -x 1.2.3.4
return " ".join(["dig", "+noall", "+answer", "-x", ip])
diff --git a/generalresearch/models/network/rdns/execute.py b/generalresearch/models/network/rdns/execute.py
index 97b8bf8..03a5080 100644
--- a/generalresearch/models/network/rdns/execute.py
+++ b/generalresearch/models/network/rdns/execute.py
@@ -14,15 +14,18 @@ from generalresearch.models.network.tool_run import (
Status,
RDNSRun,
)
-from generalresearch.models.network.tool_run_command import ToolRunCommand
+from generalresearch.models.network.tool_run_command import (
+ RDNSRunCommand,
+ RDNSRunCommandOptions,
+)
def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None):
started_at = datetime.now(tz=timezone.utc)
tool_version = get_dig_version()
- result = run_rdns(ip=ip)
+ config = RDNSRunCommand(options=RDNSRunCommandOptions(ip=ip))
+ result = run_rdns(config)
finished_at = datetime.now(tz=timezone.utc)
- raw_command = build_rdns_command(ip)
run = RDNSRun(
tool_name=ToolName.DIG,
@@ -32,9 +35,9 @@ def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None):
ip=ip,
started_at=started_at,
finished_at=finished_at,
- raw_command=raw_command,
+ raw_command=config.to_command_str(),
scan_group_id=scan_group_id or uuid4().hex,
- config=ToolRunCommand(command="dig", options={}),
+ config=config,
parsed=result,
)
diff --git a/generalresearch/models/network/rdns/parser.py b/generalresearch/models/network/rdns/parser.py
index f12a6f4..231949e 100644
--- a/generalresearch/models/network/rdns/parser.py
+++ b/generalresearch/models/network/rdns/parser.py
@@ -2,12 +2,13 @@ import ipaddress
import re
from typing import List
+from generalresearch.models.custom_types import IPvAnyAddressStr
from generalresearch.models.network.rdns.result import RDNSResult
PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.")
-def parse_rdns_output(ip, raw):
+def parse_rdns_output(ip: IPvAnyAddressStr, raw:str):
hostnames: List[str] = []
for line in raw.splitlines():
diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py
index 36e6950..114d4b6 100644
--- a/generalresearch/models/network/tool_run.py
+++ b/generalresearch/models/network/tool_run.py
@@ -12,7 +12,12 @@ from generalresearch.models.custom_types import (
from generalresearch.models.network.mtr.result import MTRResult
from generalresearch.models.network.nmap.result import NmapResult
from generalresearch.models.network.rdns.result import RDNSResult
-from generalresearch.models.network.tool_run_command import ToolRunCommand
+from generalresearch.models.network.tool_run_command import (
+ ToolRunCommand,
+ NmapRunCommand,
+ RDNSRunCommand,
+ MTRRunCommand,
+)
class ToolClass(StrEnum):
@@ -68,6 +73,7 @@ class ToolRun(BaseModel):
class NmapRun(ToolRun):
tool_class: Literal[ToolClass.PORT_SCAN] = Field(default=ToolClass.PORT_SCAN)
tool_name: Literal[ToolName.NMAP] = Field(default=ToolName.NMAP)
+ config: NmapRunCommand = Field()
parsed: NmapResult = Field()
@@ -81,6 +87,7 @@ class NmapRun(ToolRun):
class RDNSRun(ToolRun):
tool_class: Literal[ToolClass.RDNS] = Field(default=ToolClass.RDNS)
tool_name: Literal[ToolName.DIG] = Field(default=ToolName.DIG)
+ config: RDNSRunCommand = Field()
parsed: RDNSResult = Field()
@@ -94,6 +101,7 @@ class RDNSRun(ToolRun):
class MTRRun(ToolRun):
tool_class: Literal[ToolClass.TRACEROUTE] = Field(default=ToolClass.TRACEROUTE)
tool_name: Literal[ToolName.MTR] = Field(default=ToolName.MTR)
+ config: MTRRunCommand = Field()
facility_id: int = Field(default=1)
source_ip: IPvAnyAddressStr = Field()
diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py
index 5abe670..68d2070 100644
--- a/generalresearch/models/network/tool_run_command.py
+++ b/generalresearch/models/network/tool_run_command.py
@@ -1,9 +1,64 @@
-from typing import Dict, Optional
+from typing import Dict, Optional, Literal
from pydantic import BaseModel, Field
+from generalresearch.models.custom_types import IPvAnyAddressStr
+from generalresearch.models.network.definitions import IPProtocol
+
class ToolRunCommand(BaseModel):
- # todo: expand with arguments specific for each tool
command: str = Field()
options: Dict[str, Optional[str | int]] = Field(default_factory=dict)
+
+
+class NmapRunCommandOptions(BaseModel):
+ ip: IPvAnyAddressStr
+ top_ports: Optional[int] = Field(default=1000)
+ ports: Optional[str] = Field(default=None)
+ no_ping: bool = Field(default=True)
+ enable_advanced: bool = Field(default=True)
+ timing: int = Field(default=4)
+
+
+class NmapRunCommand(ToolRunCommand):
+ command: Literal["nmap"] = Field(default="nmap")
+ options: NmapRunCommandOptions = Field()
+
+ def to_command_str(self):
+ from generalresearch.models.network.nmap.command import build_nmap_command
+
+ options = self.options
+ return build_nmap_command(**options.model_dump())
+
+
+class RDNSRunCommandOptions(BaseModel):
+ ip: IPvAnyAddressStr
+
+
+class RDNSRunCommand(ToolRunCommand):
+ command: Literal["dig"] = Field(default="dig")
+ options: RDNSRunCommandOptions = Field()
+
+ def to_command_str(self):
+ from generalresearch.models.network.rdns.command import build_rdns_command
+
+ options = self.options
+ return build_rdns_command(**options.model_dump())
+
+
+class MTRRunCommandOptions(BaseModel):
+ ip: IPvAnyAddressStr = Field()
+ protocol: IPProtocol = Field(default=IPProtocol.ICMP)
+ port: Optional[int] = Field(default=None)
+ report_cycles: int = Field(default=10)
+
+
+class MTRRunCommand(ToolRunCommand):
+ command: Literal["mtr"] = Field(default="mtr")
+ options: MTRRunCommandOptions = Field()
+
+ def to_command_str(self):
+ from generalresearch.models.network.mtr.command import build_mtr_command
+
+ options = self.options
+ return build_mtr_command(**options.model_dump())
diff --git a/test_utils/managers/network/conftest.py b/test_utils/managers/network/conftest.py
index f6a4078..979dd63 100644
--- a/test_utils/managers/network/conftest.py
+++ b/test_utils/managers/network/conftest.py
@@ -7,13 +7,18 @@ import pytest
from generalresearch.managers.network.label import IPLabelManager
from generalresearch.managers.network.tool_run import ToolRunManager
from generalresearch.models.network.definitions import IPProtocol
-from generalresearch.models.network.mtr.command import build_mtr_command
from generalresearch.models.network.mtr.parser import parse_mtr_output
from generalresearch.models.network.nmap.parser import parse_nmap_xml
-from generalresearch.models.network.rdns.command import build_rdns_command
from generalresearch.models.network.rdns.parser import parse_rdns_output
from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun
-from generalresearch.models.network.tool_run_command import ToolRunCommand
+from generalresearch.models.network.tool_run_command import (
+ MTRRunCommand,
+ MTRRunCommandOptions,
+ RDNSRunCommand,
+ NmapRunCommand,
+ NmapRunCommandOptions,
+ RDNSRunCommandOptions,
+)
@pytest.fixture(scope="session")
@@ -51,15 +56,21 @@ def nmap_result(nmap_raw_output):
@pytest.fixture(scope="session")
def nmap_run(nmap_result, scan_group_id):
r = nmap_result
+ config = NmapRunCommand(
+ command="nmap",
+ options=NmapRunCommandOptions(
+ ip=r.target_ip, ports="22-1000,11000,1100,3389,61232", top_ports=None
+ ),
+ )
return NmapRun(
tool_version=r.version,
status=Status.SUCCESS,
ip=r.target_ip,
started_at=r.started_at,
finished_at=r.finished_at,
- raw_command=r.command_line,
+ raw_command=config.to_command_str(),
scan_group_id=scan_group_id,
- config=ToolRunCommand(command="nmap"),
+ config=config,
parsed=r,
)
@@ -79,15 +90,16 @@ def rdns_run(rdns_result, scan_group_id):
r = rdns_result
ip = "45.33.32.156"
utc_now = datetime.now(tz=timezone.utc)
+ config = RDNSRunCommand(command="dig", options=RDNSRunCommandOptions(ip=ip))
return RDNSRun(
tool_version="1.2.3",
status=Status.SUCCESS,
ip=ip,
started_at=utc_now,
finished_at=utc_now + timedelta(seconds=1),
- raw_command=build_rdns_command(ip=ip),
+ raw_command=config.to_command_str(),
scan_group_id=scan_group_id,
- config=ToolRunCommand(command="dig"),
+ config=config,
parsed=r,
)
@@ -109,6 +121,12 @@ def mtr_result(mtr_raw_output):
def mtr_run(mtr_result, scan_group_id):
r = mtr_result
utc_now = datetime.now(tz=timezone.utc)
+ config = MTRRunCommand(
+ command="mtr",
+ options=MTRRunCommandOptions(
+ ip=r.destination, protocol=IPProtocol.TCP, port=443
+ ),
+ )
return MTRRun(
tool_version="1.2.3",
@@ -116,11 +134,9 @@ def mtr_run(mtr_result, scan_group_id):
ip=r.destination,
started_at=utc_now,
finished_at=utc_now + timedelta(seconds=1),
- raw_command=build_mtr_command(
- ip=r.destination, protocol=IPProtocol.TCP, port=443
- ),
+ raw_command=config.to_command_str(),
scan_group_id=scan_group_id,
- config=ToolRunCommand(command="mtr"),
+ config=config,
parsed=r,
facility_id=1,
source_ip="1.2.3.4",
diff --git a/tests/managers/network/label.py b/tests/managers/network/test_label.py
index 5b9a790..5b9a790 100644
--- a/tests/managers/network/label.py
+++ b/tests/managers/network/test_label.py
diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/test_tool_run.py
index a815809..a815809 100644
--- a/tests/managers/network/tool_run.py
+++ b/tests/managers/network/test_tool_run.py
diff --git a/tests/models/network/mtr.py b/tests/models/network/test_mtr.py
index 2965300..2965300 100644
--- a/tests/models/network/mtr.py
+++ b/tests/models/network/test_mtr.py
diff --git a/tests/models/network/nmap.py b/tests/models/network/test_nmap.py
index f034bf0..0be98d4 100644
--- a/tests/models/network/nmap.py
+++ b/tests/models/network/test_nmap.py
@@ -17,7 +17,7 @@ def resolve(host):
def test_execute_nmap_scanme(toolrun_manager):
ip = resolve("scanme.nmap.org")
- run = execute_nmap(ip=ip, top_ports=20)
+ run = execute_nmap(ip=ip, top_ports=None, ports="20-30", enable_advanced=False)
assert run.tool_name == ToolName.NMAP
assert run.tool_class == ToolClass.PORT_SCAN
assert run.ip == ip
diff --git a/tests/models/network/nmap_parser.py b/tests/models/network/test_nmap_parser.py
index 96d7b37..96d7b37 100644
--- a/tests/models/network/nmap_parser.py
+++ b/tests/models/network/test_nmap_parser.py
diff --git a/tests/models/network/rdns.py b/tests/models/network/test_rdns.py
index e56c494..e56c494 100644
--- a/tests/models/network/rdns.py
+++ b/tests/models/network/test_rdns.py