aboutsummaryrefslogtreecommitdiff
path: root/generalresearch/models/network/mtr/command.py
blob: f8d2d490824c6ff0c3a91e00b0535ce22bf2a2e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import subprocess
from typing import List, Optional

from generalresearch.models.network.definitions import IPProtocol
from generalresearch.models.network.mtr.parser import parse_mtr_output
from generalresearch.models.network.mtr.result import MTRResult
from generalresearch.models.network.tool_run_command import MTRRunCommand

SUPPORTED_PROTOCOLS = {
    IPProtocol.TCP,
    IPProtocol.UDP,
    IPProtocol.SCTP,
    IPProtocol.ICMP,
}
PROTOCOLS_W_PORT = {IPProtocol.TCP, IPProtocol.UDP, IPProtocol.SCTP}


def build_mtr_command(
    ip: str,
    protocol: Optional[IPProtocol] = None,
    port: Optional[int] = None,
    report_cycles: int = 10,
) -> str:
    # https://manpages.ubuntu.com/manpages/focal/man8/mtr.8.html
    # e.g. "mtr -r -c 2 -b -z -j -T -P 443 74.139.70.149"
    args = ["mtr", "--report", "--show-ips", "--aslookup", "--json"]
    if report_cycles is not None:
        args.extend(["-c", str(int(report_cycles))])
    if port is not None:
        if protocol is None:
            protocol = IPProtocol.TCP
        assert protocol in PROTOCOLS_W_PORT, "port only allowed for TCP/SCTP/UDP traces"
        args.extend(["--port", str(int(port))])
    if protocol:
        assert protocol in SUPPORTED_PROTOCOLS, f"unsupported protocol: {protocol}"
        # default is ICMP (no args)
        arg_map = {
            IPProtocol.TCP: "--tcp",
            IPProtocol.UDP: "--udp",
            IPProtocol.SCTP: "--sctp",
        }
        if protocol in arg_map:
            args.append(arg_map[protocol])
    args.append(ip)
    return " ".join(args)


def get_mtr_version() -> str:
    proc = subprocess.run(
        ["mtr", "-v"],
        capture_output=True,
        text=True,
        check=False,
    )
    # e.g. mtr 0.95
    ver_str = proc.stdout.strip()
    return ver_str.split(" ", 1)[1]


def run_mtr(config: MTRRunCommand) -> MTRResult:
    cmd = config.to_command_str()
    args = cmd.split(" ")
    proc = subprocess.run(
        args,
        capture_output=True,
        text=True,
        check=False,
    )
    raw = proc.stdout.strip()
    return parse_mtr_output(raw, protocol=config.options.protocol, port=config.options.port)