diff options
| author | stuppie | 2026-03-12 11:53:18 -0600 |
|---|---|---|
| committer | stuppie | 2026-03-12 11:53:18 -0600 |
| commit | d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4 (patch) | |
| tree | 194ec20d29ec614fc2344ab242923cbbded06316 | |
| parent | b0306293ef52816998a463fbfe4c5b97d00b9b65 (diff) | |
| download | generalresearch-d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4.tar.gz generalresearch-d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4.zip | |
some tests to execute the full Toolrun pipeline for each tool. fix small bugs
| -rw-r--r-- | generalresearch/models/network/mtr/execute.py | 4 | ||||
| -rw-r--r-- | generalresearch/models/network/mtr/parser.py | 3 | ||||
| -rw-r--r-- | generalresearch/models/network/mtr/result.py | 4 | ||||
| -rw-r--r-- | generalresearch/models/network/nmap/execute.py | 2 | ||||
| -rw-r--r-- | generalresearch/models/network/tool_run_command.py | 4 | ||||
| -rw-r--r-- | tests/managers/network/tool_run.py | 18 | ||||
| -rw-r--r-- | tests/models/network/mtr.py | 26 | ||||
| -rw-r--r-- | tests/models/network/nmap.py | 29 | ||||
| -rw-r--r-- | tests/models/network/rdns.py | 78 |
9 files changed, 97 insertions, 71 deletions
diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py index bd556bc..81de24f 100644 --- a/generalresearch/models/network/mtr/execute.py +++ b/generalresearch/models/network/mtr/execute.py @@ -29,8 +29,8 @@ def execute_mtr( config = ToolRunCommand( command="mtr", options={ - "protocol": protocol, - "port": port, + "protocol": result.protocol, + "port": result.port, "report_cycles": report_cycles, }, ) diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py index dc108d9..fb0ca61 100644 --- a/generalresearch/models/network/mtr/parser.py +++ b/generalresearch/models/network/mtr/parser.py @@ -1,13 +1,14 @@ import json from typing import Dict +from generalresearch.models.network.definitions import IPProtocol from generalresearch.models.network.mtr.result import MTRResult def parse_mtr_output(raw: str, port, protocol) -> MTRResult: data = parse_mtr_raw_output(raw) data["port"] = port - data["protocol"] = protocol + data["protocol"] = protocol or IPProtocol.ICMP return MTRResult.model_validate(data) diff --git a/generalresearch/models/network/mtr/result.py b/generalresearch/models/network/mtr/result.py index 62f92ab..5c775b4 100644 --- a/generalresearch/models/network/mtr/result.py +++ b/generalresearch/models/network/mtr/result.py @@ -118,9 +118,9 @@ class MTRResult(BaseModel): bitpattern: str = Field(description="Payload byte pattern used in probes (hex).") # Protocol used for the traceroute - protocol: IPProtocol = Field() + protocol: IPProtocol = Field(default=IPProtocol.ICMP) # The target port number for TCP/SCTP/UDP traces - port: Optional[int] = Field() + port: Optional[int] = Field(default=None) hops: List[MTRHop] = Field() diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py index fc1e2fa..68a9926 100644 --- a/generalresearch/models/network/nmap/execute.py +++ b/generalresearch/models/network/nmap/execute.py @@ -12,7 +12,7 @@ def execute_nmap( ): result = run_nmap(ip=ip, top_ports=top_ports) assert result.exit_status == "success" - assert result.target_ip == ip + assert result.target_ip == ip, f"{result.target_ip=}, {ip=}" run = NmapRun( tool_name=ToolName.NMAP, diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py index e3d94df..5abe670 100644 --- a/generalresearch/models/network/tool_run_command.py +++ b/generalresearch/models/network/tool_run_command.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Dict, Optional from pydantic import BaseModel, Field @@ -6,4 +6,4 @@ from pydantic import BaseModel, Field class ToolRunCommand(BaseModel): # todo: expand with arguments specific for each tool command: str = Field() - options: Dict[str, str | int] = Field(default_factory=dict) + options: Dict[str, Optional[str | int]] = Field(default_factory=dict) diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py index c05af92..a815809 100644 --- a/tests/managers/network/tool_run.py +++ b/tests/managers/network/tool_run.py @@ -1,21 +1,3 @@ -import os -from datetime import datetime, timezone -from uuid import uuid4 - -import faker -import pytest - -from generalresearch.models.network.definitions import IPProtocol - -from generalresearch.models.network.tool_run import ( - ToolName, - ToolClass, - Status, -) - -fake = faker.Faker() - - def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager): toolrun_manager.create_nmap_run(nmap_run) diff --git a/tests/models/network/mtr.py b/tests/models/network/mtr.py new file mode 100644 index 0000000..2965300 --- /dev/null +++ b/tests/models/network/mtr.py @@ -0,0 +1,26 @@ +from generalresearch.models.network.mtr.execute import execute_mtr +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_mtr(toolrun_manager): + ip = "65.19.129.53" + + run = execute_mtr(ip=ip, report_cycles=3) + assert run.tool_name == ToolName.MTR + assert run.tool_class == ToolClass.TRACEROUTE + assert run.ip == ip + result = run.parsed + + last_hop = result.hops[-1] + assert last_hop.asn == 6939 + assert last_hop.domain == "grlengine.com" + + last_hop_1 = result.hops[-2] + assert last_hop_1.asn == 6939 + assert last_hop_1.domain == "he.net" + + toolrun_manager.create_mtr_run(run) diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py new file mode 100644 index 0000000..f034bf0 --- /dev/null +++ b/tests/models/network/nmap.py @@ -0,0 +1,29 @@ +import subprocess + +from generalresearch.models.network.definitions import IPProtocol +from generalresearch.models.network.nmap.execute import execute_nmap +import faker + +from generalresearch.models.network.nmap.result import PortState +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def resolve(host): + return subprocess.check_output(["dig", host, "+short"]).decode().strip() + + +def test_execute_nmap_scanme(toolrun_manager): + ip = resolve("scanme.nmap.org") + + run = execute_nmap(ip=ip, top_ports=20) + assert run.tool_name == ToolName.NMAP + assert run.tool_class == ToolClass.PORT_SCAN + assert run.ip == ip + result = run.parsed + + port22 = result._port_index[(IPProtocol.TCP, 22)] + assert port22.state == PortState.OPEN + + toolrun_manager.create_nmap_run(run) diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py index 64e8351..e56c494 100644 --- a/tests/models/network/rdns.py +++ b/tests/models/network/rdns.py @@ -1,45 +1,33 @@ -# from generalresearch.models.network.rdns import run_rdns -# import faker -# -# fake = faker.Faker() -# -# -# def test_dig_rdns(): -# # Actually runs dig -x. Idk how stable this is -# ip = "45.33.32.156" -# rdns_result = run_rdns(ip) -# assert rdns_result.primary_hostname == "scanme.nmap.org" -# assert rdns_result.primary_org == "nmap" -# -# ip = "65.19.129.53" -# rdns_result = run_rdns(ip) -# assert rdns_result.primary_hostname == "in1-smtp.grlengine.com" -# assert rdns_result.primary_org == "grlengine" -# -# ip = fake.ipv6() -# rdns_result = run_rdns(ip) -# assert rdns_result.primary_hostname is None -# assert rdns_result.primary_org is None -# print(rdns_result.model_dump_postgres()) - - -# -# -# def test_run_dig(toolrun_manager): -# reverse_dns_run = run_dig(ip="65.19.129.53") -# -# toolrun_manager.create_rdns_run(reverse_dns_run) -# -# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) -# -# assert reverse_dns_run == run_out -# -# -# def test_run_dig_empty(toolrun_manager): -# reverse_dns_run = run_dig(ip=fake.ipv6()) -# -# toolrun_manager.create_rdns_run(reverse_dns_run) -# -# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id) -# -# assert reverse_dns_run == run_out
\ No newline at end of file +from generalresearch.models.network.rdns.execute import execute_rdns +import faker + +from generalresearch.models.network.tool_run import ToolName, ToolClass + +fake = faker.Faker() + + +def test_execute_rdns_grl(toolrun_manager): + ip = "65.19.129.53" + run = execute_rdns(ip=ip) + assert run.tool_name == ToolName.DIG + assert run.tool_class == ToolClass.RDNS + assert run.ip == ip + result = run.parsed + assert result.primary_hostname == "in1-smtp.grlengine.com" + assert result.primary_domain == "grlengine.com" + assert result.hostname_count == 1 + + toolrun_manager.create_rdns_run(run) + + +def test_execute_rdns_none(toolrun_manager): + ip = fake.ipv6() + run = execute_rdns(ip) + result = run.parsed + + assert result.primary_hostname is None + assert result.primary_domain is None + assert result.hostname_count == 0 + assert result.hostnames == [] + + toolrun_manager.create_rdns_run(run) |
