1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import os
from datetime import timedelta, datetime, timezone
from uuid import uuid4
import pytest
from generalresearch.managers.network.label import IPLabelManager
from generalresearch.managers.network.tool_run import ToolRunManager
from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.mtr.command import build_mtr_command
from generalresearch.models.network.mtr.parser import parse_mtr_output
from generalresearch.models.network.nmap.parser import parse_nmap_xml
from generalresearch.models.network.rdns.command import build_rdns_command
from generalresearch.models.network.rdns.parser import parse_rdns_output
from generalresearch.models.network.tool_run import NmapRun, Status, RDNSRun, MTRRun
from generalresearch.models.network.tool_run_command import ToolRunCommand
@pytest.fixture(scope="session")
def scan_group_id():
return uuid4().hex
@pytest.fixture(scope="session")
def iplabel_manager(thl_web_rw) -> IPLabelManager:
assert "/unittest-" in thl_web_rw.dsn.path
return IPLabelManager(pg_config=thl_web_rw)
@pytest.fixture(scope="session")
def toolrun_manager(thl_web_rw) -> ToolRunManager:
assert "/unittest-" in thl_web_rw.dsn.path
return ToolRunManager(pg_config=thl_web_rw)
@pytest.fixture(scope="session")
def nmap_raw_output(request) -> str:
fp = os.path.join(request.config.rootpath, "data/nmaprun1.xml")
with open(fp, "r") as f:
data = f.read()
return data
@pytest.fixture(scope="session")
def nmap_result(nmap_raw_output):
return parse_nmap_xml(nmap_raw_output)
@pytest.fixture(scope="session")
def nmap_run(nmap_result, scan_group_id):
r = nmap_result
return NmapRun(
tool_version=r.version,
status=Status.SUCCESS,
ip=r.target_ip,
started_at=r.started_at,
finished_at=r.finished_at,
raw_command=r.command_line,
scan_group_id=scan_group_id,
config=ToolRunCommand(command="nmap"),
parsed=r,
)
@pytest.fixture(scope="session")
def dig_raw_output():
return "156.32.33.45.in-addr.arpa. 300 IN PTR scanme.nmap.org."
@pytest.fixture(scope="session")
def rdns_result(dig_raw_output):
return parse_rdns_output(ip="45.33.32.156", raw=dig_raw_output)
@pytest.fixture(scope="session")
def rdns_run(rdns_result, scan_group_id):
r = rdns_result
ip = "45.33.32.156"
utc_now = datetime.now(tz=timezone.utc)
return RDNSRun(
tool_version="1.2.3",
status=Status.SUCCESS,
ip=ip,
started_at=utc_now,
finished_at=utc_now + timedelta(seconds=1),
raw_command=build_rdns_command(ip=ip),
scan_group_id=scan_group_id,
config=ToolRunCommand(command="dig"),
parsed=r,
)
@pytest.fixture(scope="session")
def mtr_raw_output(request):
fp = os.path.join(request.config.rootpath, "data/mtr_fatbeam.json")
with open(fp, "r") as f:
data = f.read()
return data
@pytest.fixture(scope="session")
def mtr_result(mtr_raw_output):
return parse_mtr_output(mtr_raw_output, port=443, protocol=IPProtocol.TCP)
@pytest.fixture(scope="session")
def mtr_run(mtr_result, scan_group_id):
r = mtr_result
utc_now = datetime.now(tz=timezone.utc)
return MTRRun(
tool_version="1.2.3",
status=Status.SUCCESS,
ip=r.destination,
started_at=utc_now,
finished_at=utc_now + timedelta(seconds=1),
raw_command=build_mtr_command(
ip=r.destination, protocol=IPProtocol.TCP, port=443
),
scan_group_id=scan_group_id,
config=ToolRunCommand(command="mtr"),
parsed=r,
facility_id=1,
source_ip="1.2.3.4",
)
|