From efe1c368b8b49a2c6b3bf2193a5b89eb5426eba3 Mon Sep 17 00:00:00 2001 From: stuppie Date: Fri, 13 Mar 2026 13:31:22 -0600 Subject: ToolRunCommand + options models to handle cmd line args for all network tools --- generalresearch/models/network/mtr/command.py | 17 +- generalresearch/models/network/mtr/execute.py | 29 +-- generalresearch/models/network/mtr/parser.py | 4 +- generalresearch/models/network/nmap/command.py | 34 +++- generalresearch/models/network/nmap/execute.py | 30 ++- generalresearch/models/network/rdns/command.py | 10 +- generalresearch/models/network/rdns/execute.py | 13 +- generalresearch/models/network/rdns/parser.py | 3 +- generalresearch/models/network/tool_run.py | 10 +- generalresearch/models/network/tool_run_command.py | 59 +++++- test_utils/managers/network/conftest.py | 38 ++-- tests/managers/network/label.py | 202 --------------------- tests/managers/network/test_label.py | 202 +++++++++++++++++++++ tests/managers/network/test_tool_run.py | 25 +++ tests/managers/network/tool_run.py | 25 --- tests/models/network/mtr.py | 26 --- tests/models/network/nmap.py | 29 --- tests/models/network/nmap_parser.py | 22 --- tests/models/network/rdns.py | 33 ---- tests/models/network/test_mtr.py | 26 +++ tests/models/network/test_nmap.py | 29 +++ tests/models/network/test_nmap_parser.py | 22 +++ tests/models/network/test_rdns.py | 33 ++++ 23 files changed, 523 insertions(+), 398 deletions(-) delete mode 100644 tests/managers/network/label.py create mode 100644 tests/managers/network/test_label.py create mode 100644 tests/managers/network/test_tool_run.py delete mode 100644 tests/managers/network/tool_run.py delete mode 100644 tests/models/network/mtr.py delete mode 100644 tests/models/network/nmap.py delete mode 100644 tests/models/network/nmap_parser.py delete mode 100644 tests/models/network/rdns.py create mode 100644 tests/models/network/test_mtr.py create mode 100644 tests/models/network/test_nmap.py create mode 100644 tests/models/network/test_nmap_parser.py create mode 100644 tests/models/network/test_rdns.py diff --git a/generalresearch/models/network/mtr/command.py b/generalresearch/models/network/mtr/command.py index e3ab903..f8d2d49 100644 --- a/generalresearch/models/network/mtr/command.py +++ b/generalresearch/models/network/mtr/command.py @@ -4,6 +4,7 @@ from typing import List, Optional from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.parser import parse_mtr_output from generalresearch.models.network.mtr.result import MTRResult +from generalresearch.models.network.tool_run_command import MTRRunCommand SUPPORTED_PROTOCOLS = { IPProtocol.TCP, @@ -56,20 +57,14 @@ def get_mtr_version() -> str: return ver_str.split(" ", 1)[1] -def run_mtr( - ip: str, - protocol: Optional[IPProtocol] = None, - port: Optional[int] = None, - report_cycles: int = 10, -) -> MTRResult: - args = build_mtr_command( - ip=ip, protocol=protocol, port=port, report_cycles=report_cycles - ) +def run_mtr(config: MTRRunCommand) -> MTRResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( - args.split(" "), + args, capture_output=True, text=True, check=False, ) raw = proc.stdout.strip() - return parse_mtr_output(raw, protocol=protocol, port=port) + return parse_mtr_output(raw, protocol=config.options.protocol, port=config.options.port) diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py index 81de24f..a6fb82a 100644 --- a/generalresearch/models/network/mtr/execute.py +++ b/generalresearch/models/network/mtr/execute.py @@ -10,30 +10,33 @@ from generalresearch.models.network.mtr.command import ( build_mtr_command, ) from generalresearch.models.network.tool_run import MTRRun, ToolName, ToolClass, Status -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + MTRRunCommand, + MTRRunCommandOptions, +) from generalresearch.models.network.utils import get_source_ip def execute_mtr( ip: str, scan_group_id: Optional[UUIDStr] = None, - protocol: Optional[IPProtocol] = None, + protocol: Optional[IPProtocol] = IPProtocol.ICMP, port: Optional[int] = None, report_cycles: int = 10, ) -> MTRRun: + config = MTRRunCommand( + options=MTRRunCommandOptions( + ip=ip, + report_cycles=report_cycles, + protocol=protocol, + port=port, + ), + ) + started_at = datetime.now(tz=timezone.utc) tool_version = get_mtr_version() - result = run_mtr(ip, protocol=protocol, port=port, report_cycles=report_cycles) + result = run_mtr(config) finished_at = datetime.now(tz=timezone.utc) - raw_command = build_mtr_command(ip) - config = ToolRunCommand( - command="mtr", - options={ - "protocol": result.protocol, - "port": result.port, - "report_cycles": report_cycles, - }, - ) return MTRRun( tool_name=ToolName.MTR, @@ -43,7 +46,7 @@ def execute_mtr( ip=ip, started_at=started_at, finished_at=finished_at, - raw_command=raw_command, + raw_command=config.to_command_str(), scan_group_id=scan_group_id or uuid4().hex, config=config, parsed=result, diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py index fb0ca61..685eeca 100644 --- a/generalresearch/models/network/mtr/parser.py +++ b/generalresearch/models/network/mtr/parser.py @@ -5,10 +5,10 @@ from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.result import MTRResult -def parse_mtr_output(raw: str, port, protocol) -> MTRResult: +def parse_mtr_output(raw: str, port: int, protocol: IPProtocol) -> MTRResult: data = parse_mtr_raw_output(raw) data["port"] = port - data["protocol"] = protocol or IPProtocol.ICMP + data["protocol"] = protocol return MTRResult.model_validate(data) diff --git a/generalresearch/models/network/nmap/command.py b/generalresearch/models/network/nmap/command.py index dfa55de..47e0a87 100644 --- a/generalresearch/models/network/nmap/command.py +++ b/generalresearch/models/network/nmap/command.py @@ -3,18 +3,40 @@ from typing import Optional, List from generalresearch.models.network.nmap.parser import parse_nmap_xml from generalresearch.models.network.nmap.result import NmapResult +from generalresearch.models.network.tool_run_command import NmapRunCommand -def build_nmap_command(ip: str, top_ports: Optional[int] = 1000) -> List[str]: +def build_nmap_command( + ip: str, + no_ping: bool = True, + enable_advanced: bool = True, + timing: int = 4, + ports: Optional[str] = None, + top_ports: Optional[int] = None, +) -> str: # e.g. "nmap -Pn -T4 -A --top-ports 1000 -oX - scanme.nmap.org" # https://linux.die.net/man/1/nmap - args = ["nmap", "-Pn", "-T4", "-A", "--top-ports", str(int(top_ports)), "-oX", "-"] - args.append(ip) - return args + args = ["nmap"] + assert 0 <= timing <= 5 + args.append(f"-T{timing}") + if no_ping: + args.append("-Pn") + if enable_advanced: + args.append("-A") + if ports is not None: + assert top_ports is None + args.extend(["-p", ports]) + if top_ports is not None: + assert ports is None + args.extend(["--top-ports", str(top_ports)]) + args.extend(["-oX", "-", ip]) + return " ".join(args) -def run_nmap(ip: str, top_ports: Optional[int] = 1000) -> NmapResult: - args = build_nmap_command(ip=ip, top_ports=top_ports) + +def run_nmap(config: NmapRunCommand) -> NmapResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( args, capture_output=True, diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py index 68a9926..0334f50 100644 --- a/generalresearch/models/network/nmap/execute.py +++ b/generalresearch/models/network/nmap/execute.py @@ -4,15 +4,35 @@ from uuid import uuid4 from generalresearch.models.custom_types import UUIDStr from generalresearch.models.network.nmap.command import run_nmap from generalresearch.models.network.tool_run import NmapRun, ToolName, ToolClass, Status -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + NmapRunCommand, + NmapRunCommandOptions, +) def execute_nmap( - ip: str, top_ports: Optional[int] = 1000, scan_group_id: Optional[UUIDStr] = None + ip: str, + top_ports: Optional[int] = 1000, + ports: Optional[str] = None, + no_ping: bool = True, + enable_advanced: bool = True, + timing: int = 4, + scan_group_id: Optional[UUIDStr] = None, ): - result = run_nmap(ip=ip, top_ports=top_ports) + config = NmapRunCommand( + options=NmapRunCommandOptions( + top_ports=top_ports, + ports=ports, + no_ping=no_ping, + enable_advanced=enable_advanced, + timing=timing, + ip=ip, + ) + ) + result = run_nmap(config) assert result.exit_status == "success" assert result.target_ip == ip, f"{result.target_ip=}, {ip=}" + assert result.command_line == config.to_command_str() run = NmapRun( tool_name=ToolName.NMAP, @@ -24,7 +44,7 @@ def execute_nmap( finished_at=result.finished_at, raw_command=result.command_line, scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand(command="nmap", options={'top_ports': top_ports}), + config=config, parsed=result, ) - return run \ No newline at end of file + return run diff --git a/generalresearch/models/network/rdns/command.py b/generalresearch/models/network/rdns/command.py index e9d5bfd..aa48f2a 100644 --- a/generalresearch/models/network/rdns/command.py +++ b/generalresearch/models/network/rdns/command.py @@ -2,10 +2,12 @@ import subprocess from generalresearch.models.network.rdns.parser import parse_rdns_output from generalresearch.models.network.rdns.result import RDNSResult +from generalresearch.models.network.tool_run_command import RDNSRunCommand -def run_rdns(ip: str) -> RDNSResult: - args = build_rdns_command(ip).split(" ") +def run_rdns(config: RDNSRunCommand) -> RDNSResult: + cmd = config.to_command_str() + args = cmd.split(" ") proc = subprocess.run( args, capture_output=True, @@ -13,10 +15,10 @@ def run_rdns(ip: str) -> RDNSResult: check=False, ) raw = proc.stdout.strip() - return parse_rdns_output(ip, raw) + return parse_rdns_output(ip=config.options.ip, raw=raw) -def build_rdns_command(ip: str): +def build_rdns_command(ip: str) -> str: # e.g. dig +noall +answer -x 1.2.3.4 return " ".join(["dig", "+noall", "+answer", "-x", ip]) diff --git a/generalresearch/models/network/rdns/execute.py b/generalresearch/models/network/rdns/execute.py index 97b8bf8..03a5080 100644 --- a/generalresearch/models/network/rdns/execute.py +++ b/generalresearch/models/network/rdns/execute.py @@ -14,15 +14,18 @@ from generalresearch.models.network.tool_run import ( Status, RDNSRun, ) -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + RDNSRunCommand, + RDNSRunCommandOptions, +) def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): started_at = datetime.now(tz=timezone.utc) tool_version = get_dig_version() - result = run_rdns(ip=ip) + config = RDNSRunCommand(options=RDNSRunCommandOptions(ip=ip)) + result = run_rdns(config) finished_at = datetime.now(tz=timezone.utc) - raw_command = build_rdns_command(ip) run = RDNSRun( tool_name=ToolName.DIG, @@ -32,9 +35,9 @@ def execute_rdns(ip: str, scan_group_id: Optional[UUIDStr] = None): ip=ip, started_at=started_at, finished_at=finished_at, - raw_command=raw_command, + raw_command=config.to_command_str(), scan_group_id=scan_group_id or uuid4().hex, - config=ToolRunCommand(command="dig", options={}), + config=config, parsed=result, ) diff --git a/generalresearch/models/network/rdns/parser.py b/generalresearch/models/network/rdns/parser.py index f12a6f4..231949e 100644 --- a/generalresearch/models/network/rdns/parser.py +++ b/generalresearch/models/network/rdns/parser.py @@ -2,12 +2,13 @@ import ipaddress import re from typing import List +from generalresearch.models.custom_types import IPvAnyAddressStr from generalresearch.models.network.rdns.result import RDNSResult PTR_RE = re.compile(r"\sPTR\s+([^\s]+)\.") -def parse_rdns_output(ip, raw): +def parse_rdns_output(ip: IPvAnyAddressStr, raw:str): hostnames: List[str] = [] for line in raw.splitlines(): diff --git a/generalresearch/models/network/tool_run.py b/generalresearch/models/network/tool_run.py index 36e6950..114d4b6 100644 --- a/generalresearch/models/network/tool_run.py +++ b/generalresearch/models/network/tool_run.py @@ -12,7 +12,12 @@ from generalresearch.models.custom_types import ( from generalresearch.models.network.mtr.result import MTRResult from generalresearch.models.network.nmap.result import NmapResult from generalresearch.models.network.rdns.result import RDNSResult -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + ToolRunCommand, + NmapRunCommand, + RDNSRunCommand, + MTRRunCommand, +) class ToolClass(StrEnum): @@ -68,6 +73,7 @@ class ToolRun(BaseModel): class NmapRun(ToolRun): tool_class: Literal[ToolClass.PORT_SCAN] = Field(default=ToolClass.PORT_SCAN) tool_name: Literal[ToolName.NMAP] = Field(default=ToolName.NMAP) + config: NmapRunCommand = Field() parsed: NmapResult = Field() @@ -81,6 +87,7 @@ class NmapRun(ToolRun): class RDNSRun(ToolRun): tool_class: Literal[ToolClass.RDNS] = Field(default=ToolClass.RDNS) tool_name: Literal[ToolName.DIG] = Field(default=ToolName.DIG) + config: RDNSRunCommand = Field() parsed: RDNSResult = Field() @@ -94,6 +101,7 @@ class RDNSRun(ToolRun): class MTRRun(ToolRun): tool_class: Literal[ToolClass.TRACEROUTE] = Field(default=ToolClass.TRACEROUTE) tool_name: Literal[ToolName.MTR] = Field(default=ToolName.MTR) + config: MTRRunCommand = Field() facility_id: int = Field(default=1) source_ip: IPvAnyAddressStr = Field() diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py index 5abe670..68d2070 100644 --- a/generalresearch/models/network/tool_run_command.py +++ b/generalresearch/models/network/tool_run_command.py @@ -1,9 +1,64 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Literal from pydantic import BaseModel, Field +from generalresearch.models.custom_types import IPvAnyAddressStr +from generalresearch.models.network.definitions import IPProtocol + class ToolRunCommand(BaseModel): - # todo: expand with arguments specific for each tool command: str = Field() options: Dict[str, Optional[str | int]] = Field(default_factory=dict) + + +class NmapRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr + top_ports: Optional[int] = Field(default=1000) + ports: Optional[str] = Field(default=None) + no_ping: bool = Field(default=True) + enable_advanced: bool = Field(default=True) + timing: int = Field(default=4) + + +class NmapRunCommand(ToolRunCommand): + command: Literal["nmap"] = Field(default="nmap") + options: NmapRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.nmap.command import build_nmap_command + + options = self.options + return build_nmap_command(**options.model_dump()) + + +class RDNSRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr + + +class RDNSRunCommand(ToolRunCommand): + command: Literal["dig"] = Field(default="dig") + options: RDNSRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.rdns.command import build_rdns_command + + options = self.options + return build_rdns_command(**options.model_dump()) + + +class MTRRunCommandOptions(BaseModel): + ip: IPvAnyAddressStr = Field() + protocol: IPProtocol = Field(default=IPProtocol.ICMP) + port: Optional[int] = Field(default=None) + report_cycles: int = Field(default=10) + + +class MTRRunCommand(ToolRunCommand): + command: Literal["mtr"] = Field(default="mtr") + options: MTRRunCommandOptions = Field() + + def to_command_str(self): + from generalresearch.models.network.mtr.command import build_mtr_command + + options = self.options + return build_mtr_command(**options.model_dump()) diff --git a/test_utils/managers/network/conftest.py b/test_utils/managers/network/conftest.py index f6a4078..979dd63 100644 --- a/test_utils/managers/network/conftest.py +++ b/test_utils/managers/network/conftest.py @@ -7,13 +7,18 @@ import pytest from generalresearch.managers.network.label import IPLabelManager from generalresearch.managers.network.tool_run import ToolRunManager from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.mtr.command import build_mtr_command from generalresearch.models.network.mtr.parser import parse_mtr_output from generalresearch.models.network.nmap.parser import parse_nmap_xml -from generalresearch.models.network.rdns.command import build_rdns_command from generalresearch.models.network.rdns.parser import parse_rdns_output from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun -from generalresearch.models.network.tool_run_command import ToolRunCommand +from generalresearch.models.network.tool_run_command import ( + MTRRunCommand, + MTRRunCommandOptions, + RDNSRunCommand, + NmapRunCommand, + NmapRunCommandOptions, + RDNSRunCommandOptions, +) @pytest.fixture(scope="session") @@ -51,15 +56,21 @@ def nmap_result(nmap_raw_output): @pytest.fixture(scope="session") def nmap_run(nmap_result, scan_group_id): r = nmap_result + config = NmapRunCommand( + command="nmap", + options=NmapRunCommandOptions( + ip=r.target_ip, ports="22-1000,11000,1100,3389,61232", top_ports=None + ), + ) return NmapRun( tool_version=r.version, status=Status.SUCCESS, ip=r.target_ip, started_at=r.started_at, finished_at=r.finished_at, - raw_command=r.command_line, + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="nmap"), + config=config, parsed=r, ) @@ -79,15 +90,16 @@ def rdns_run(rdns_result, scan_group_id): r = rdns_result ip = "45.33.32.156" utc_now = datetime.now(tz=timezone.utc) + config = RDNSRunCommand(command="dig", options=RDNSRunCommandOptions(ip=ip)) return RDNSRun( tool_version="1.2.3", status=Status.SUCCESS, ip=ip, started_at=utc_now, finished_at=utc_now + timedelta(seconds=1), - raw_command=build_rdns_command(ip=ip), + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="dig"), + config=config, parsed=r, ) @@ -109,6 +121,12 @@ def mtr_result(mtr_raw_output): def mtr_run(mtr_result, scan_group_id): r = mtr_result utc_now = datetime.now(tz=timezone.utc) + config = MTRRunCommand( + command="mtr", + options=MTRRunCommandOptions( + ip=r.destination, protocol=IPProtocol.TCP, port=443 + ), + ) return MTRRun( tool_version="1.2.3", @@ -116,11 +134,9 @@ def mtr_run(mtr_result, scan_group_id): ip=r.destination, started_at=utc_now, finished_at=utc_now + timedelta(seconds=1), - raw_command=build_mtr_command( - ip=r.destination, protocol=IPProtocol.TCP, port=443 - ), + raw_command=config.to_command_str(), scan_group_id=scan_group_id, - config=ToolRunCommand(command="mtr"), + config=config, parsed=r, facility_id=1, source_ip="1.2.3.4", diff --git a/tests/managers/network/label.py b/tests/managers/network/label.py deleted file mode 100644 index 5b9a790..0000000 --- a/tests/managers/network/label.py +++ /dev/null @@ -1,202 +0,0 @@ -import ipaddress - -import faker -import pytest -from psycopg.errors import UniqueViolation -from pydantic import ValidationError - -from generalresearch.managers.network.label import IPLabelManager -from generalresearch.models.network.label import ( - IPLabel, - IPLabelKind, - IPLabelSource, - IPLabelMetadata, -) -from generalresearch.models.thl.ipinfo import normalize_ip - -fake = faker.Faker() - - -@pytest.fixture -def ip_label(utc_now) -> IPLabel: - ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - return IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - metadata=IPLabelMetadata(services=["RDP"]) - ) - - -def test_model(utc_now): - ip = fake.ipv4_public() - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - assert lbl.ip.prefixlen == 32 - print(f"{lbl.ip=}") - - ip = ipaddress.IPv4Network((ip, 24), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"): - IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=fake.ipv6(), - ) - - ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False) - lbl = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip, - ) - print(f"{lbl.ip=}") - - -def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel): - iplabel_manager.create(ip_label) - - with pytest.raises( - UniqueViolation, match="duplicate key value violates unique constraint" - ): - iplabel_manager.create(ip_label) - - -def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago): - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 0 - - iplabel_manager.create(ip_label) - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 1 - - out = res[0] - assert out == ip_label - - res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) - assert len(res) == 1 - - ip_label2 = ip_label.model_copy() - ip_label2.ip = fake.ipv4_public() - iplabel_manager.create(ip_label2) - res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) - assert len(res) == 2 - - -def test_filter_network( - iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago -): - print(ip_label) - ip_label = ip_label.model_copy() - ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) - - iplabel_manager.create(ip_label) - res = iplabel_manager.filter(ips=[ip_label.ip]) - assert len(res) == 1 - - out = res[0] - assert out == ip_label - - res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) - assert len(res) == 1 - - ip_label2 = ip_label.model_copy() - ip_label2.ip = fake.ipv4_public() - iplabel_manager.create(ip_label2) - res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) - assert len(res) == 2 - - -def test_network(iplabel_manager: IPLabelManager, utc_now): - # This is a fully-specific /128 ipv6 address. - # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d' - ip = fake.ipv6() - # Generally, we'd want to annotate the /64 network - # e.g. '51b7:b38d:8717:6c5b::/64' - ip_64 = ipaddress.IPv6Network((ip, 64), strict=False) - - label = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip_64, - ) - iplabel_manager.create(label) - - # If I query for the /128 directly, I won't find it - res = iplabel_manager.filter(ips=[ip]) - assert len(res) == 0 - - # If I query for the /64 network I will - res = iplabel_manager.filter(ips=[ip_64]) - assert len(res) == 1 - - # Or, I can query for the /128 ip IN a network - res = iplabel_manager.filter(ip_in_network=ip) - assert len(res) == 1 - - -def test_label_cidr_and_ipinfo( - iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now -): - # We have network_iplabel.ip as a cidr col and - # thl_ipinformation.ip as a inet col. Make sure we can join appropriately - ip = fake.ipv6() - ip_information_factory(ip=ip, geoname=ip_geoname) - # We normalize for storage into ipinfo table - ip_norm, prefix = normalize_ip(ip) - - # Test with a larger network - ip_48 = ipaddress.IPv6Network((ip, 48), strict=False) - print(f"{ip=}") - print(f"{ip_norm=}") - print(f"{ip_48=}") - label = IPLabel( - label_kind=IPLabelKind.VPN, - labeled_at=utc_now, - source=IPLabelSource.INTERNAL_USE, - provider="GeoNodE", - created_at=utc_now, - ip=ip_48, - ) - iplabel_manager.create(label) - - res = iplabel_manager.test_join(ip_norm) - print(res) diff --git a/tests/managers/network/test_label.py b/tests/managers/network/test_label.py new file mode 100644 index 0000000..5b9a790 --- /dev/null +++ b/tests/managers/network/test_label.py @@ -0,0 +1,202 @@ +import ipaddress + +import faker +import pytest +from psycopg.errors import UniqueViolation +from pydantic import ValidationError + +from generalresearch.managers.network.label import IPLabelManager +from generalresearch.models.network.label import ( + IPLabel, + IPLabelKind, + IPLabelSource, + IPLabelMetadata, +) +from generalresearch.models.thl.ipinfo import normalize_ip + +fake = faker.Faker() + + +@pytest.fixture +def ip_label(utc_now) -> IPLabel: + ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + return IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + metadata=IPLabelMetadata(services=["RDP"]) + ) + + +def test_model(utc_now): + ip = fake.ipv4_public() + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + assert lbl.ip.prefixlen == 32 + print(f"{lbl.ip=}") + + ip = ipaddress.IPv4Network((ip, 24), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + with pytest.raises(ValidationError, match="IPv6 network must be /64 or larger"): + IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=fake.ipv6(), + ) + + ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + ip = ipaddress.IPv6Network((ip.network_address, 48), strict=False) + lbl = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip, + ) + print(f"{lbl.ip=}") + + +def test_create(iplabel_manager: IPLabelManager, ip_label: IPLabel): + iplabel_manager.create(ip_label) + + with pytest.raises( + UniqueViolation, match="duplicate key value violates unique constraint" + ): + iplabel_manager.create(ip_label) + + +def test_filter(iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago): + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 0 + + iplabel_manager.create(ip_label) + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 1 + + out = res[0] + assert out == ip_label + + res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) + assert len(res) == 1 + + ip_label2 = ip_label.model_copy() + ip_label2.ip = fake.ipv4_public() + iplabel_manager.create(ip_label2) + res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) + assert len(res) == 2 + + +def test_filter_network( + iplabel_manager: IPLabelManager, ip_label: IPLabel, utc_hour_ago +): + print(ip_label) + ip_label = ip_label.model_copy() + ip_label.ip = ipaddress.IPv6Network((fake.ipv6(), 64), strict=False) + + iplabel_manager.create(ip_label) + res = iplabel_manager.filter(ips=[ip_label.ip]) + assert len(res) == 1 + + out = res[0] + assert out == ip_label + + res = iplabel_manager.filter(ips=[ip_label.ip], labeled_after=utc_hour_ago) + assert len(res) == 1 + + ip_label2 = ip_label.model_copy() + ip_label2.ip = fake.ipv4_public() + iplabel_manager.create(ip_label2) + res = iplabel_manager.filter(ips=[ip_label.ip, ip_label2.ip]) + assert len(res) == 2 + + +def test_network(iplabel_manager: IPLabelManager, utc_now): + # This is a fully-specific /128 ipv6 address. + # e.g. '51b7:b38d:8717:6c5b:cd3e:f5c3:3aba:17d' + ip = fake.ipv6() + # Generally, we'd want to annotate the /64 network + # e.g. '51b7:b38d:8717:6c5b::/64' + ip_64 = ipaddress.IPv6Network((ip, 64), strict=False) + + label = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip_64, + ) + iplabel_manager.create(label) + + # If I query for the /128 directly, I won't find it + res = iplabel_manager.filter(ips=[ip]) + assert len(res) == 0 + + # If I query for the /64 network I will + res = iplabel_manager.filter(ips=[ip_64]) + assert len(res) == 1 + + # Or, I can query for the /128 ip IN a network + res = iplabel_manager.filter(ip_in_network=ip) + assert len(res) == 1 + + +def test_label_cidr_and_ipinfo( + iplabel_manager: IPLabelManager, ip_information_factory, ip_geoname, utc_now +): + # We have network_iplabel.ip as a cidr col and + # thl_ipinformation.ip as a inet col. Make sure we can join appropriately + ip = fake.ipv6() + ip_information_factory(ip=ip, geoname=ip_geoname) + # We normalize for storage into ipinfo table + ip_norm, prefix = normalize_ip(ip) + + # Test with a larger network + ip_48 = ipaddress.IPv6Network((ip, 48), strict=False) + print(f"{ip=}") + print(f"{ip_norm=}") + print(f"{ip_48=}") + label = IPLabel( + label_kind=IPLabelKind.VPN, + labeled_at=utc_now, + source=IPLabelSource.INTERNAL_USE, + provider="GeoNodE", + created_at=utc_now, + ip=ip_48, + ) + iplabel_manager.create(label) + + res = iplabel_manager.test_join(ip_norm) + print(res) diff --git a/tests/managers/network/test_tool_run.py b/tests/managers/network/test_tool_run.py new file mode 100644 index 0000000..a815809 --- /dev/null +++ b/tests/managers/network/test_tool_run.py @@ -0,0 +1,25 @@ +def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): + + toolrun_manager.create_nmap_run(nmap_run) + + run_out = toolrun_manager.get_nmap_run(nmap_run.id) + + assert nmap_run == run_out + + +def test_create_tool_run_from_rdns_run(rdns_run, toolrun_manager): + + toolrun_manager.create_rdns_run(rdns_run) + + run_out = toolrun_manager.get_rdns_run(rdns_run.id) + + assert rdns_run == run_out + + +def test_create_tool_run_from_mtr_run(mtr_run, toolrun_manager): + + toolrun_manager.create_mtr_run(mtr_run) + + run_out = toolrun_manager.get_mtr_run(mtr_run.id) + + assert mtr_run == run_out diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py deleted file mode 100644 index a815809..0000000 --- a/tests/managers/network/tool_run.py +++ /dev/null @@ -1,25 +0,0 @@ -def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): - - toolrun_manager.create_nmap_run(nmap_run) - - run_out = toolrun_manager.get_nmap_run(nmap_run.id) - - assert nmap_run == run_out - - -def test_create_tool_run_from_rdns_run(rdns_run, toolrun_manager): - - toolrun_manager.create_rdns_run(rdns_run) - - run_out = toolrun_manager.get_rdns_run(rdns_run.id) - - assert rdns_run == run_out - - -def test_create_tool_run_from_mtr_run(mtr_run, toolrun_manager): - - toolrun_manager.create_mtr_run(mtr_run) - - run_out = toolrun_manager.get_mtr_run(mtr_run.id) - - assert mtr_run == run_out diff --git a/tests/models/network/mtr.py b/tests/models/network/mtr.py deleted file mode 100644 index 2965300..0000000 --- a/tests/models/network/mtr.py +++ /dev/null @@ -1,26 +0,0 @@ -from generalresearch.models.network.mtr.execute import execute_mtr -import faker - -from generalresearch.models.network.tool_run import ToolName, ToolClass - -fake = faker.Faker() - - -def test_execute_mtr(toolrun_manager): - ip = "65.19.129.53" - - run = execute_mtr(ip=ip, report_cycles=3) - assert run.tool_name == ToolName.MTR - assert run.tool_class == ToolClass.TRACEROUTE - assert run.ip == ip - result = run.parsed - - last_hop = result.hops[-1] - assert last_hop.asn == 6939 - assert last_hop.domain == "grlengine.com" - - last_hop_1 = result.hops[-2] - assert last_hop_1.asn == 6939 - assert last_hop_1.domain == "he.net" - - toolrun_manager.create_mtr_run(run) diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py deleted file mode 100644 index f034bf0..0000000 --- a/tests/models/network/nmap.py +++ /dev/null @@ -1,29 +0,0 @@ -import subprocess - -from generalresearch.models.network.definitions import IPProtocol -from generalresearch.models.network.nmap.execute import execute_nmap -import faker - -from generalresearch.models.network.nmap.result import PortState -from generalresearch.models.network.tool_run import ToolName, ToolClass - -fake = faker.Faker() - - -def resolve(host): - return subprocess.check_output(["dig", host, "+short"]).decode().strip() - - -def test_execute_nmap_scanme(toolrun_manager): - ip = resolve("scanme.nmap.org") - - run = execute_nmap(ip=ip, top_ports=20) - assert run.tool_name == ToolName.NMAP - assert run.tool_class == ToolClass.PORT_SCAN - assert run.ip == ip - result = run.parsed - - port22 = result._port_index[(IPProtocol.TCP, 22)] - assert port22.state == PortState.OPEN - - toolrun_manager.create_nmap_run(run) diff --git a/tests/models/network/nmap_parser.py b/tests/models/network/nmap_parser.py deleted file mode 100644 index 96d7b37..0000000 --- a/tests/models/network/nmap_parser.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -import pytest - -from generalresearch.models.network.nmap.parser import parse_nmap_xml - -@pytest.fixture -def nmap_raw_output_2(request) -> str: - fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml") - with open(fp, "r") as f: - data = f.read() - return data - - -def test_nmap_xml_parser(nmap_raw_output, nmap_raw_output_2): - n = parse_nmap_xml(nmap_raw_output) - assert n.tcp_open_ports == [61232] - assert len(n.trace.hops) == 18 - - n = parse_nmap_xml(nmap_raw_output_2) - assert n.tcp_open_ports == [22, 80, 9929, 31337] - assert n.trace is None diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py deleted file mode 100644 index e56c494..0000000 --- a/tests/models/network/rdns.py +++ /dev/null @@ -1,33 +0,0 @@ -from generalresearch.models.network.rdns.execute import execute_rdns -import faker - -from generalresearch.models.network.tool_run import ToolName, ToolClass - -fake = faker.Faker() - - -def test_execute_rdns_grl(toolrun_manager): - ip = "65.19.129.53" - run = execute_rdns(ip=ip) - assert run.tool_name == ToolName.DIG - assert run.tool_class == ToolClass.RDNS - assert run.ip == ip - result = run.parsed - assert result.primary_hostname == "in1-smtp.grlengine.com" - assert result.primary_domain == "grlengine.com" - assert result.hostname_count == 1 - - toolrun_manager.create_rdns_run(run) - - -def test_execute_rdns_none(toolrun_manager): - ip = fake.ipv6() - run = execute_rdns(ip) - result = run.parsed - - assert result.primary_hostname is None - assert result.primary_domain is None - assert result.hostname_count == 0 - assert result.hostnames == [] - - toolrun_manager.create_rdns_run(run) diff --git a/tests/models/network/test_mtr.py b/tests/models/network/test_mtr.py new file mode 100644 index 0000000..2965300 --- /dev/null +++ b/tests/models/network/test_mtr.py @@ -0,0 +1,26 @@ +from generalresearch.models.network.mtr.execute import execute_mtr +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_mtr(toolrun_manager): + ip = "65.19.129.53" + + run = execute_mtr(ip=ip, report_cycles=3) + assert run.tool_name == ToolName.MTR + assert run.tool_class == ToolClass.TRACEROUTE + assert run.ip == ip + result = run.parsed + + last_hop = result.hops[-1] + assert last_hop.asn == 6939 + assert last_hop.domain == "grlengine.com" + + last_hop_1 = result.hops[-2] + assert last_hop_1.asn == 6939 + assert last_hop_1.domain == "he.net" + + toolrun_manager.create_mtr_run(run) diff --git a/tests/models/network/test_nmap.py b/tests/models/network/test_nmap.py new file mode 100644 index 0000000..0be98d4 --- /dev/null +++ b/tests/models/network/test_nmap.py @@ -0,0 +1,29 @@ +import subprocess + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.nmap.execute import execute_nmap +import faker + +from generalresearch.models.network.nmap.result import PortState +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def resolve(host): + return subprocess.check_output(["dig", host, "+short"]).decode().strip() + + +def test_execute_nmap_scanme(toolrun_manager): + ip = resolve("scanme.nmap.org") + + run = execute_nmap(ip=ip, top_ports=None, ports="20-30", enable_advanced=False) + assert run.tool_name == ToolName.NMAP + assert run.tool_class == ToolClass.PORT_SCAN + assert run.ip == ip + result = run.parsed + + port22 = result._port_index[(IPProtocol.TCP, 22)] + assert port22.state == PortState.OPEN + + toolrun_manager.create_nmap_run(run) diff --git a/tests/models/network/test_nmap_parser.py b/tests/models/network/test_nmap_parser.py new file mode 100644 index 0000000..96d7b37 --- /dev/null +++ b/tests/models/network/test_nmap_parser.py @@ -0,0 +1,22 @@ +import os + +import pytest + +from generalresearch.models.network.nmap.parser import parse_nmap_xml + +@pytest.fixture +def nmap_raw_output_2(request) -> str: + fp = os.path.join(request.config.rootpath, "data/nmaprun2.xml") + with open(fp, "r") as f: + data = f.read() + return data + + +def test_nmap_xml_parser(nmap_raw_output, nmap_raw_output_2): + n = parse_nmap_xml(nmap_raw_output) + assert n.tcp_open_ports == [61232] + assert len(n.trace.hops) == 18 + + n = parse_nmap_xml(nmap_raw_output_2) + assert n.tcp_open_ports == [22, 80, 9929, 31337] + assert n.trace is None diff --git a/tests/models/network/test_rdns.py b/tests/models/network/test_rdns.py new file mode 100644 index 0000000..e56c494 --- /dev/null +++ b/tests/models/network/test_rdns.py @@ -0,0 +1,33 @@ +from generalresearch.models.network.rdns.execute import execute_rdns +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_rdns_grl(toolrun_manager): + ip = "65.19.129.53" + run = execute_rdns(ip=ip) + assert run.tool_name == ToolName.DIG + assert run.tool_class == ToolClass.RDNS + assert run.ip == ip + result = run.parsed + assert result.primary_hostname == "in1-smtp.grlengine.com" + assert result.primary_domain == "grlengine.com" + assert result.hostname_count == 1 + + toolrun_manager.create_rdns_run(run) + + +def test_execute_rdns_none(toolrun_manager): + ip = fake.ipv6() + run = execute_rdns(ip) + result = run.parsed + + assert result.primary_hostname is None + assert result.primary_domain is None + assert result.hostname_count == 0 + assert result.hostnames == [] + + toolrun_manager.create_rdns_run(run) -- cgit v1.2.3