aboutsummaryrefslogtreecommitdiff
path: root/generalresearch/managers/network/tool_run.py
diff options
context:
space:
mode:
Diffstat (limited to 'generalresearch/managers/network/tool_run.py')
-rw-r--r--generalresearch/managers/network/tool_run.py139
1 files changed, 139 insertions, 0 deletions
diff --git a/generalresearch/managers/network/tool_run.py b/generalresearch/managers/network/tool_run.py
new file mode 100644
index 0000000..17f4935
--- /dev/null
+++ b/generalresearch/managers/network/tool_run.py
@@ -0,0 +1,139 @@
+from typing import Collection, List, Dict
+
+from psycopg import Cursor, sql
+
+from generalresearch.managers.base import PostgresManager, Permission
+
+from generalresearch.managers.network.nmap import NmapRunManager
+from generalresearch.managers.network.rdns import RDNSRunManager
+from generalresearch.managers.network.mtr import MTRRunManager
+from generalresearch.models.network.rdns.result import RDNSResult
+from generalresearch.models.network.tool_run import (
+ NmapRun,
+ RDNSRun,
+ MTRRun,
+ ToolRun,
+ ToolName,
+)
+from generalresearch.pg_helper import PostgresConfig
+
+
+class ToolRunManager(PostgresManager):
+ def __init__(
+ self,
+ pg_config: PostgresConfig,
+ permissions: Collection[Permission] = None,
+ ):
+ super().__init__(pg_config=pg_config, permissions=permissions)
+ self.nmap_manager = NmapRunManager(self.pg_config)
+ self.rdns_manager = RDNSRunManager(self.pg_config)
+ self.mtr_manager = MTRRunManager(self.pg_config)
+
+ def _create_tool_run(self, run: NmapRun | RDNSRun | MTRRun, c: Cursor):
+ query = sql.SQL(
+ """
+ INSERT INTO network_toolrun (
+ ip, scan_group_id, tool_class,
+ tool_name, tool_version, started_at,
+ finished_at, status, raw_command,
+ config
+ )
+ VALUES (
+ %(ip)s, %(scan_group_id)s, %(tool_class)s,
+ %(tool_name)s, %(tool_version)s, %(started_at)s,
+ %(finished_at)s, %(status)s, %(raw_command)s,
+ %(config)s
+ ) RETURNING id;
+ """
+ )
+ params = run.model_dump_postgres()
+ c.execute(query, params)
+ run_id = c.fetchone()["id"]
+ run.id = run_id
+ return None
+
+ def create_tool_run(self, run: NmapRun | RDNSRun | MTRRun):
+ if type(run) is NmapRun:
+ return self.create_nmap_run(run)
+ elif type(run) is RDNSRun:
+ return self.create_rdns_run(run)
+ elif type(run) is MTRRun:
+ return self.create_mtr_run(run)
+ else:
+ raise ValueError("unrecognized run type")
+
+ def get_latest_runs_by_tool(self, ip: str) -> Dict[ToolName, ToolRun]:
+ query = """
+ SELECT DISTINCT ON (tool_name) *
+ FROM network_toolrun
+ WHERE ip = %(ip)s
+ ORDER BY tool_name, started_at DESC;
+ """
+ params = {"ip": ip}
+ res = self.pg_config.execute_sql_query(query, params=params)
+ runs = [ToolRun.model_validate(x) for x in res]
+ return {r.tool_name: r for r in runs}
+
+ def create_nmap_run(self, run: NmapRun) -> NmapRun:
+ """
+ Insert a PortScan + PortScanPorts from a Pydantic NmapResult.
+ """
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ self._create_tool_run(run, c)
+ self.nmap_manager._create(run, c=c)
+ return run
+
+ def get_nmap_run(self, id: int) -> NmapRun:
+ query = """
+ SELECT tr.*, np.parsed
+ FROM network_toolrun tr
+ JOIN network_portscan np ON tr.id = np.run_id
+ WHERE id = %(id)s
+ """
+ params = {"id": id}
+ res = self.pg_config.execute_sql_query(query, params)[0]
+ return NmapRun.model_validate(res)
+
+ def create_rdns_run(self, run: RDNSRun) -> RDNSRun:
+ """
+ Insert a RDnsRun + RDNSResult
+ """
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ self._create_tool_run(run, c)
+ self.rdns_manager._create(run, c=c)
+ return run
+
+ def get_rdns_run(self, id: int) -> RDNSRun:
+ query = """
+ SELECT tr.*, hostnames
+ FROM network_toolrun tr
+ JOIN network_rdnsresult np ON tr.id = np.run_id
+ WHERE id = %(id)s
+ """
+ params = {"id": id}
+ res = self.pg_config.execute_sql_query(query, params)[0]
+ parsed = RDNSResult.model_validate(
+ {"ip": res["ip"], "hostnames": res["hostnames"]}
+ )
+ res["parsed"] = parsed
+ return RDNSRun.model_validate(res)
+
+ def create_mtr_run(self, run: MTRRun) -> MTRRun:
+ with self.pg_config.make_connection() as conn:
+ with conn.cursor() as c:
+ self._create_tool_run(run, c)
+ self.mtr_manager._create(run, c=c)
+ return run
+
+ def get_mtr_run(self, id: int) -> MTRRun:
+ query = """
+ SELECT tr.*, mtr.parsed, mtr.source_ip, mtr.facility_id
+ FROM network_toolrun tr
+ JOIN network_mtr mtr ON tr.id = mtr.run_id
+ WHERE id = %(id)s
+ """
+ params = {"id": id}
+ res = self.pg_config.execute_sql_query(query, params)[0]
+ return MTRRun.model_validate(res)