Skip to content

Commit

Permalink
Agent: Refactor get_routes to resolve C901
Browse files Browse the repository at this point in the history
Issue: #1076
PR: #3334
  • Loading branch information
ilija-lazoroski committed May 15, 2023
1 parent a5ec39e commit cd235d9
Showing 1 changed file with 68 additions and 52 deletions.
120 changes: 68 additions & 52 deletions monkey/infection_monkey/network/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
from multiprocessing.context import BaseContext
from multiprocessing.managers import DictProxy, SyncManager
from random import shuffle # noqa: DUO102
from typing import Iterator, Optional, Set
from typing import Iterator, List, Optional, Set, Tuple

import psutil
from egg_timer import EggTimer

from common.types import IntRange, NetworkPort
from common.utils.environment import is_windows_os

if not is_windows_os():
from fcntl import ioctl

from .ports import COMMON_PORTS

# Timeout for monkey connections
Expand All @@ -35,60 +38,73 @@ class NetworkAddress:
domain: Optional[str]


if is_windows_os():

def get_routes():
def get_routes() -> List[Tuple[int, int, str, bytes, str]]:
if is_windows_os():
raise NotImplementedError()

else:
from fcntl import ioctl

def get_routes(): # based on scapy implementation for route parsing
try:
f = open("/proc/net/route", "r")
except IOError:
return []
routes = []
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ifreq = ioctl(s, SIOCGIFADDR, struct.pack("16s16x", LOOPBACK_NAME))
addrfamily = struct.unpack("h", ifreq[16:18])[0]
if addrfamily == socket.AF_INET:
ifreq2 = ioctl(s, SIOCGIFNETMASK, struct.pack("16s16x", LOOPBACK_NAME))
msk = socket.ntohl(struct.unpack("I", ifreq2[20:24])[0])
dst = socket.ntohl(struct.unpack("I", ifreq[20:24])[0]) & msk
ifaddr = socket.inet_ntoa(ifreq[20:24])
routes.append((dst, msk, "0.0.0.0", LOOPBACK_NAME, ifaddr))

for line in f.readlines()[1:]:
iff, dst, gw, flags, x, x, x, msk, x, x, x = [var.encode() for var in line.split()]
flags = int(flags, 16)
if flags & RTF_UP == 0:
continue
if flags & RTF_REJECT:
continue
try:
ifreq = ioctl(s, SIOCGIFADDR, struct.pack("16s16x", iff))
except IOError: # interface is present in routing tables but does not have any
# assigned IP
ifaddr = "0.0.0.0"
else:
addrfamily = struct.unpack("h", ifreq[16:18])[0]
if addrfamily == socket.AF_INET:
ifaddr = socket.inet_ntoa(ifreq[20:24])
else:
continue
routes.append(
(
socket.htonl(int(dst, 16)) & 0xFFFFFFFF,
socket.htonl(int(msk, 16)) & 0xFFFFFFFF,
socket.inet_ntoa(struct.pack("I", int(gw, 16))),
iff,
ifaddr,
)
routes = []
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

for line in _read_route_file():
dst: bytes
msk: bytes
iff, dst, gw, flags, msk = _extract_network_info(line)
if flags & RTF_UP == 0:
continue
if flags & RTF_REJECT:
continue
ifaddr: Optional[str] = _get_interface_address(s, iff)
if ifaddr is None:
continue
routes.append(
(
socket.htonl(int(dst, 16)) & 0xFFFFFFFF,
socket.htonl(int(msk, 16)) & 0xFFFFFFFF,
socket.inet_ntoa(struct.pack("I", int(gw, 16))),
iff,
ifaddr,
)

f.close()
return routes
)

ifreq = ioctl(s, SIOCGIFADDR, struct.pack("16s16x", LOOPBACK_NAME))
addrfamily = struct.unpack("h", ifreq[16:18])[0]
if addrfamily == socket.AF_INET:
ifreq2 = ioctl(s, SIOCGIFNETMASK, struct.pack("16s16x", LOOPBACK_NAME))
mask = socket.ntohl(struct.unpack("I", ifreq2[20:24])[0])
destination = socket.ntohl(struct.unpack("I", ifreq[20:24])[0]) & mask
ifaddress = socket.inet_ntoa(ifreq[20:24])
routes.append((destination, mask, "0.0.0.0", LOOPBACK_NAME, ifaddress))

return routes


def _read_route_file() -> List[str]:
try:
with open("/proc/net/route", "r") as f:
return f.readlines()[1:]
except IOError:
return []


def _extract_network_info(line: str) -> Tuple[bytes, bytes, bytes, int, bytes]:
values = [var.encode() for var in line.split()]
iff: bytes = values[0]
dst: bytes = values[1]
gw: bytes = values[2]
flags: int = int(values[3], 16)
msk: bytes = values[7]
return iff, dst, gw, flags, msk


def _get_interface_address(s: socket.socket, iff: bytes) -> Optional[str]:
try:
ifreq = ioctl(s, SIOCGIFADDR, struct.pack("16s16x", iff))
except IOError:
return "0.0.0.0"
addrfamily = struct.unpack("h", ifreq[16:18])[0]
if addrfamily == socket.AF_INET:
return socket.inet_ntoa(ifreq[20:24])
return None


class TCPPortSelector:
Expand Down

0 comments on commit cd235d9

Please sign in to comment.