1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
import os
from datetime import timedelta, datetime, timezone
from uuid import uuid4
import pytest
from generalresearch.managers.network.label import IPLabelManager
from generalresearch.managers.network.tool_run import ToolRunManager
from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.mtr.parser import parse_mtr_output
from generalresearch.models.network.nmap.parser import parse_nmap_xml
from generalresearch.models.network.rdns.parser import parse_rdns_output
from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun
from generalresearch.models.network.tool_run_command import (
MTRRunCommand,
MTRRunCommandOptions,
RDNSRunCommand,
NmapRunCommand,
NmapRunCommandOptions,
RDNSRunCommandOptions,
)
@pytest.fixture(scope="session")
def scan_group_id():
return uuid4().hex
@pytest.fixture(scope="session")
def iplabel_manager(thl_web_rw) -> IPLabelManager:
assert "/unittest-" in thl_web_rw.dsn.path
return IPLabelManager(pg_config=thl_web_rw)
@pytest.fixture(scope="session")
def toolrun_manager(thl_web_rw) -> ToolRunManager:
assert "/unittest-" in thl_web_rw.dsn.path
return ToolRunManager(pg_config=thl_web_rw)
@pytest.fixture(scope="session")
def nmap_raw_output(request) -> str:
fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml")
with open(fp, "r") as f:
data = f.read()
return data
@pytest.fixture(scope="session")
def nmap_result(nmap_raw_output):
return parse_nmap_xml(nmap_raw_output)
@pytest.fixture(scope="session")
def nmap_run(nmap_result, scan_group_id):
r = nmap_result
config = NmapRunCommand(
command="nmap",
options=NmapRunCommandOptions(
ip=r.target_ip, ports="22-1000,11000,1100,3389,61232", top_ports=None
),
)
return NmapRun(
tool_version=r.version,
status=Status.SUCCESS,
ip=r.target_ip,
started_at=r.started_at,
finished_at=r.finished_at,
raw_command=config.to_command_str(),
scan_group_id=scan_group_id,
config=config,
parsed=r,
)
@pytest.fixture(scope="session")
def dig_raw_output():
return "156.32.33.45.in-addr.arpa. 300 IN PTR scanme.nmap.org."
@pytest.fixture(scope="session")
def rdns_result(dig_raw_output):
return parse_rdns_output(ip="45.33.32.156", raw=dig_raw_output)
@pytest.fixture(scope="session")
def rdns_run(rdns_result, scan_group_id):
r = rdns_result
ip = "45.33.32.156"
utc_now = datetime.now(tz=timezone.utc)
config = RDNSRunCommand(command="dig", options=RDNSRunCommandOptions(ip=ip))
return RDNSRun(
tool_version="1.2.3",
status=Status.SUCCESS,
ip=ip,
started_at=utc_now,
finished_at=utc_now + timedelta(seconds=1),
raw_command=config.to_command_str(),
scan_group_id=scan_group_id,
config=config,
parsed=r,
)
@pytest.fixture(scope="session")
def mtr_raw_output(request):
fp = os.path.join(request.config.rootpath, "data/mtr_fatbeam.json")
with open(fp, "r") as f:
data = f.read()
return data
@pytest.fixture(scope="session")
def mtr_result(mtr_raw_output):
return parse_mtr_output(mtr_raw_output, port=443, protocol=IPProtocol.TCP)
@pytest.fixture(scope="session")
def mtr_run(mtr_result, scan_group_id):
r = mtr_result
utc_now = datetime.now(tz=timezone.utc)
config = MTRRunCommand(
command="mtr",
options=MTRRunCommandOptions(
ip=r.destination, protocol=IPProtocol.TCP, port=443
),
)
return MTRRun(
tool_version="1.2.3",
status=Status.SUCCESS,
ip=r.destination,
started_at=utc_now,
finished_at=utc_now + timedelta(seconds=1),
raw_command=config.to_command_str(),
scan_group_id=scan_group_id,
config=config,
parsed=r,
facility_id=1,
source_ip="1.2.3.4",
)
|