aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorstuppie2026-03-12 11:53:18 -0600
committerstuppie2026-03-12 11:53:18 -0600
commitd9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4 (patch)
tree194ec20d29ec614fc2344ab242923cbbded06316
parentb0306293ef52816998a463fbfe4c5b97d00b9b65 (diff)
downloadgeneralresearch-d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4.tar.gz
generalresearch-d9ab70d98624e22b87dfe40cc5e18c8ad5eb44c4.zip
some tests to execute the full Toolrun pipeline for each tool. fix small bugs
-rw-r--r--generalresearch/models/network/mtr/execute.py4
-rw-r--r--generalresearch/models/network/mtr/parser.py3
-rw-r--r--generalresearch/models/network/mtr/result.py4
-rw-r--r--generalresearch/models/network/nmap/execute.py2
-rw-r--r--generalresearch/models/network/tool_run_command.py4
-rw-r--r--tests/managers/network/tool_run.py18
-rw-r--r--tests/models/network/mtr.py26
-rw-r--r--tests/models/network/nmap.py29
-rw-r--r--tests/models/network/rdns.py78
9 files changed, 97 insertions, 71 deletions
diff --git a/generalresearch/models/network/mtr/execute.py b/generalresearch/models/network/mtr/execute.py
index bd556bc..81de24f 100644
--- a/generalresearch/models/network/mtr/execute.py
+++ b/generalresearch/models/network/mtr/execute.py
@@ -29,8 +29,8 @@ def execute_mtr(
config = ToolRunCommand(
command="mtr",
options={
- "protocol": protocol,
- "port": port,
+ "protocol": result.protocol,
+ "port": result.port,
"report_cycles": report_cycles,
},
)
diff --git a/generalresearch/models/network/mtr/parser.py b/generalresearch/models/network/mtr/parser.py
index dc108d9..fb0ca61 100644
--- a/generalresearch/models/network/mtr/parser.py
+++ b/generalresearch/models/network/mtr/parser.py
@@ -1,13 +1,14 @@
import json
from typing import Dict
+from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.mtr.result import MTRResult
def parse_mtr_output(raw: str, port, protocol) -> MTRResult:
data = parse_mtr_raw_output(raw)
data["port"] = port
- data["protocol"] = protocol
+ data["protocol"] = protocol or IPProtocol.ICMP
return MTRResult.model_validate(data)
diff --git a/generalresearch/models/network/mtr/result.py b/generalresearch/models/network/mtr/result.py
index 62f92ab..5c775b4 100644
--- a/generalresearch/models/network/mtr/result.py
+++ b/generalresearch/models/network/mtr/result.py
@@ -118,9 +118,9 @@ class MTRResult(BaseModel):
bitpattern: str = Field(description="Payload byte pattern used in probes (hex).")
# Protocol used for the traceroute
- protocol: IPProtocol = Field()
+ protocol: IPProtocol = Field(default=IPProtocol.ICMP)
# The target port number for TCP/SCTP/UDP traces
- port: Optional[int] = Field()
+ port: Optional[int] = Field(default=None)
hops: List[MTRHop] = Field()
diff --git a/generalresearch/models/network/nmap/execute.py b/generalresearch/models/network/nmap/execute.py
index fc1e2fa..68a9926 100644
--- a/generalresearch/models/network/nmap/execute.py
+++ b/generalresearch/models/network/nmap/execute.py
@@ -12,7 +12,7 @@ def execute_nmap(
):
result = run_nmap(ip=ip, top_ports=top_ports)
assert result.exit_status == "success"
- assert result.target_ip == ip
+ assert result.target_ip == ip, f"{result.target_ip=}, {ip=}"
run = NmapRun(
tool_name=ToolName.NMAP,
diff --git a/generalresearch/models/network/tool_run_command.py b/generalresearch/models/network/tool_run_command.py
index e3d94df..5abe670 100644
--- a/generalresearch/models/network/tool_run_command.py
+++ b/generalresearch/models/network/tool_run_command.py
@@ -1,4 +1,4 @@
-from typing import Dict
+from typing import Dict, Optional
from pydantic import BaseModel, Field
@@ -6,4 +6,4 @@ from pydantic import BaseModel, Field
class ToolRunCommand(BaseModel):
# todo: expand with arguments specific for each tool
command: str = Field()
- options: Dict[str, str | int] = Field(default_factory=dict)
+ options: Dict[str, Optional[str | int]] = Field(default_factory=dict)
diff --git a/tests/managers/network/tool_run.py b/tests/managers/network/tool_run.py
index c05af92..a815809 100644
--- a/tests/managers/network/tool_run.py
+++ b/tests/managers/network/tool_run.py
@@ -1,21 +1,3 @@
-import os
-from datetime import datetime, timezone
-from uuid import uuid4
-
-import faker
-import pytest
-
-from generalresearch.models.network.definitions import IPProtocol
-
-from generalresearch.models.network.tool_run import (
- ToolName,
- ToolClass,
- Status,
-)
-
-fake = faker.Faker()
-
-
def test_create_tool_run_from_nmap_run(nmap_run, toolrun_manager):
toolrun_manager.create_nmap_run(nmap_run)
diff --git a/tests/models/network/mtr.py b/tests/models/network/mtr.py
new file mode 100644
index 0000000..2965300
--- /dev/null
+++ b/tests/models/network/mtr.py
@@ -0,0 +1,26 @@
+from generalresearch.models.network.mtr.execute import execute_mtr
+import faker
+
+from generalresearch.models.network.tool_run import ToolName, ToolClass
+
+fake = faker.Faker()
+
+
+def test_execute_mtr(toolrun_manager):
+ ip = "65.19.129.53"
+
+ run = execute_mtr(ip=ip, report_cycles=3)
+ assert run.tool_name == ToolName.MTR
+ assert run.tool_class == ToolClass.TRACEROUTE
+ assert run.ip == ip
+ result = run.parsed
+
+ last_hop = result.hops[-1]
+ assert last_hop.asn == 6939
+ assert last_hop.domain == "grlengine.com"
+
+ last_hop_1 = result.hops[-2]
+ assert last_hop_1.asn == 6939
+ assert last_hop_1.domain == "he.net"
+
+ toolrun_manager.create_mtr_run(run)
diff --git a/tests/models/network/nmap.py b/tests/models/network/nmap.py
new file mode 100644
index 0000000..f034bf0
--- /dev/null
+++ b/tests/models/network/nmap.py
@@ -0,0 +1,29 @@
+import subprocess
+
+from generalresearch.models.network.definitions import IPProtocol
+from generalresearch.models.network.nmap.execute import execute_nmap
+import faker
+
+from generalresearch.models.network.nmap.result import PortState
+from generalresearch.models.network.tool_run import ToolName, ToolClass
+
+fake = faker.Faker()
+
+
+def resolve(host):
+ return subprocess.check_output(["dig", host, "+short"]).decode().strip()
+
+
+def test_execute_nmap_scanme(toolrun_manager):
+ ip = resolve("scanme.nmap.org")
+
+ run = execute_nmap(ip=ip, top_ports=20)
+ assert run.tool_name == ToolName.NMAP
+ assert run.tool_class == ToolClass.PORT_SCAN
+ assert run.ip == ip
+ result = run.parsed
+
+ port22 = result._port_index[(IPProtocol.TCP, 22)]
+ assert port22.state == PortState.OPEN
+
+ toolrun_manager.create_nmap_run(run)
diff --git a/tests/models/network/rdns.py b/tests/models/network/rdns.py
index 64e8351..e56c494 100644
--- a/tests/models/network/rdns.py
+++ b/tests/models/network/rdns.py
@@ -1,45 +1,33 @@
-# from generalresearch.models.network.rdns import run_rdns
-# import faker
-#
-# fake = faker.Faker()
-#
-#
-# def test_dig_rdns():
-# # Actually runs dig -x. Idk how stable this is
-# ip = "45.33.32.156"
-# rdns_result = run_rdns(ip)
-# assert rdns_result.primary_hostname == "scanme.nmap.org"
-# assert rdns_result.primary_org == "nmap"
-#
-# ip = "65.19.129.53"
-# rdns_result = run_rdns(ip)
-# assert rdns_result.primary_hostname == "in1-smtp.grlengine.com"
-# assert rdns_result.primary_org == "grlengine"
-#
-# ip = fake.ipv6()
-# rdns_result = run_rdns(ip)
-# assert rdns_result.primary_hostname is None
-# assert rdns_result.primary_org is None
-# print(rdns_result.model_dump_postgres())
-
-
-#
-#
-# def test_run_dig(toolrun_manager):
-# reverse_dns_run = run_dig(ip="65.19.129.53")
-#
-# toolrun_manager.create_rdns_run(reverse_dns_run)
-#
-# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
-#
-# assert reverse_dns_run == run_out
-#
-#
-# def test_run_dig_empty(toolrun_manager):
-# reverse_dns_run = run_dig(ip=fake.ipv6())
-#
-# toolrun_manager.create_rdns_run(reverse_dns_run)
-#
-# run_out = toolrun_manager.get_rdns_run(reverse_dns_run.id)
-#
-# assert reverse_dns_run == run_out \ No newline at end of file
+from generalresearch.models.network.rdns.execute import execute_rdns
+import faker
+
+from generalresearch.models.network.tool_run import ToolName, ToolClass
+
+fake = faker.Faker()
+
+
+def test_execute_rdns_grl(toolrun_manager):
+ ip = "65.19.129.53"
+ run = execute_rdns(ip=ip)
+ assert run.tool_name == ToolName.DIG
+ assert run.tool_class == ToolClass.RDNS
+ assert run.ip == ip
+ result = run.parsed
+ assert result.primary_hostname == "in1-smtp.grlengine.com"
+ assert result.primary_domain == "grlengine.com"
+ assert result.hostname_count == 1
+
+ toolrun_manager.create_rdns_run(run)
+
+
+def test_execute_rdns_none(toolrun_manager):
+ ip = fake.ipv6()
+ run = execute_rdns(ip)
+ result = run.parsed
+
+ assert result.primary_hostname is None
+ assert result.primary_domain is None
+ assert result.hostname_count == 0
+ assert result.hostnames == []
+
+ toolrun_manager.create_rdns_run(run)