Skip to content

Commit

Permalink
Support for Scope Identifiers in IP addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
gpotter2 committed Jul 13, 2024
1 parent a1afb9a commit 17cad47
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 101 deletions.
17 changes: 16 additions & 1 deletion doc/scapy/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1488,9 +1488,24 @@ NBNS Query Request (find by NetbiosName)

.. code::
>>> conf.checkIPaddr = False # Mandatory because we are using a broadcast destination
>>> conf.checkIPaddr = False # Mandatory because we are using a broadcast destination and receiving unicast
>>> sr1(IP(dst="192.168.0.255")/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME="DC1"))
mDNS Query Request
------------------

For instance, find all spotify connect devices.

.. code::
>>> # For interface 'eth0'
>>> ans, _ = sr(IPv6(dst="ff02::fb%eth0")/UDP(sport=5353, dport=5353)/DNS(rd=0, qd=[DNSQR(qname='_spotify-connect._tcp.local', qtype="PTR")]), multi=True, timeout=2)
>>> ans.show()
.. note::

As you can see, we used a scope identifier (``%eth0``) to specify on which interface we want to use the above multicast IP.

Advanced traceroute
-------------------

Expand Down
29 changes: 28 additions & 1 deletion scapy/arch/linux/rtnetlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ def _sr1_rtrequest(pkt: Packet) -> List[Packet]:
if msg.nlmsg_type == 3 and nlmsgerr in msg and msg.status != 0:
# NLMSG_DONE with errors
if msg.data and msg.data[0].rta_type == 1:
log_loading.warning(
log_loading.debug(
"Scapy RTNETLINK error on %s: '%s'. Please report !",
pkt.sprintf("%nlmsg_type%"),
msg.data[0].rta_data.decode(),
Expand Down Expand Up @@ -900,6 +900,20 @@ def read_routes():
elif attr.rta_type == 0x07: # RTA_PREFSRC
addr = attr.rta_data
routes.append((net, mask, gw, iface, addr, metric))
# Add multicast routes, as those are missing by default
for _iface in ifaces.values():
if _iface['flags'].MULTICAST:
try:
addr = next(
x["address"]
for x in _iface["ips"]
if x["af_family"] == socket.AF_INET
)
except StopIteration:
continue
routes.append((
0xe0000000, 0xf0000000, "0.0.0.0", _iface["name"], addr, 250
))
return routes


Expand Down Expand Up @@ -937,4 +951,17 @@ def read_routes6():
cset = scapy.utils6.construct_source_candidate_set(prefix, plen, devaddrs)
if cset:
routes.append((prefix, plen, nh, iface, cset, metric))
# Add multicast routes, as those are missing by default
for _iface in ifaces.values():
if _iface['flags'].MULTICAST:
addrs = [
x["address"]
for x in _iface["ips"]
if x["af_family"] == socket.AF_INET6
]
if not addrs:
continue
routes.append((
"ff00::", 8, "::", _iface["name"], addrs, 250
))
return routes
97 changes: 89 additions & 8 deletions scapy/base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,75 @@ def __repr__(self):
return "<SetGen %r>" % self.values


class _NetIP(str):
"""
An str that also holds extra attributes.
"""
__slots__ = ["scope"]

def __init__(self, _: str) -> None:
self.scope = None

def __repr__(self) -> str:
val = super(_NetIP, self).__repr__()
if self.scope is not None:
return "NetIP(%s, scope=%s)" % (val, repr(self.scope))
return val


def NetIP(net: str, scope: Optional[Any] = None) -> _NetIP:
"""
An str that also holds extra attributes.
Examples::
>>> NetIP("224.0.0.1%eth0") # interface 'eth0'
>>> NetIP("224.0.0.1%1") # interface index 1
>>> NetIP("224.0.0.1", scope=conf.iface)
"""
if "%" in net:
try:
net, scope = net.split("%", 1)
except ValueError:
raise Scapy_Exception("Scope identifier can only be present once !")
if scope is not None:
from scapy.interfaces import resolve_iface, network_name, dev_from_index
try:
iface = dev_from_index(int(scope))
except (ValueError, TypeError):
iface = resolve_iface(scope)
if not iface.is_valid():
raise Scapy_Exception(
"RFC6874 scope identifier '%s' could not be resolved to a "
"valid interface !" % scope
)
scope = network_name(iface)
x = _NetIP(net)
x.scope = scope
return x


class Net(Gen[str]):
"""Network object from an IP address or hostname and mask"""
"""
Network object from an IP address or hostname and mask
Examples:
- With mask::
>>> list(Net("192.168.0.1/24"))
['192.168.0.0', '192.168.0.1', ..., '192.168.0.255']
- With 'end'::
>>> list(Net("192.168.0.100", "192.168.0.200"))
['192.168.0.100', '192.168.0.101', ..., '192.168.0.200']
- With 'scope' (for multicast)::
>>> Net("224.0.0.1%lo")
>>> Net("224.0.0.1", scope=conf.iface)
"""
name = "Net" # type: str
family = socket.AF_INET # type: int
max_mask = 32 # type: int
Expand Down Expand Up @@ -143,11 +210,14 @@ def int2ip(val):
# type: (int) -> str
return socket.inet_ntoa(struct.pack('!I', val))

def __init__(self, net, stop=None):
# type: (str, Union[None, str]) -> None
def __init__(self, net, stop=None, scope=None):
# type: (str, Optional[str], Optional[str]) -> None
if "*" in net:
raise Scapy_Exception("Wildcards are no longer accepted in %s()" %
self.__class__.__name__)
if "%" in net:
net = NetIP(net)
self.scope = net.scope
if stop is None:
try:
net, mask = net.split("/", 1)
Expand All @@ -174,7 +244,10 @@ def __iter__(self):
# type: () -> Iterator[str]
# Python 2 won't handle huge (> sys.maxint) values in range()
for i in range(self.count):
yield self.int2ip(self.start + i)
yield NetIP(
self.int2ip(self.start + i),
scope=self.scope,
)

def __len__(self):
# type: () -> int
Expand All @@ -187,20 +260,28 @@ def __iterlen__(self):

def choice(self):
# type: () -> str
return self.int2ip(random.randint(self.start, self.stop))
return NetIP(
self.int2ip(random.randint(self.start, self.stop)),
scope=self.scope,
)

def __repr__(self):
# type: () -> str
scope_id_repr = ""
if self.scope:
scope_id_repr = ", scope=%s" % repr(self.scope)
if self.mask is not None:
return '%s("%s/%d")' % (
return '%s("%s/%d"%s)' % (
self.__class__.__name__,
self.net,
self.mask,
scope_id_repr,
)
return '%s("%s", "%s")' % (
return '%s("%s", "%s"%s)' % (
self.__class__.__name__,
self.int2ip(self.start),
self.int2ip(self.stop),
scope_id_repr,
)

def __eq__(self, other):
Expand All @@ -220,7 +301,7 @@ def __ne__(self, other):

def __hash__(self):
# type: () -> int
return hash(("scapy.Net", self.family, self.start, self.stop))
return hash(("scapy.Net", self.family, self.start, self.stop, self.scope))

def __contains__(self, other):
# type: (Any) -> bool
Expand Down
77 changes: 33 additions & 44 deletions scapy/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@
from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal
from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \
in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo
from scapy.base_classes import Gen, Net, BasePacket, Field_metaclass
from scapy.error import warning
from scapy.base_classes import (
_NetIP,
BasePacket,
Field_metaclass,
Net,
NetIP,
)

# Typing imports
from typing import (
Expand Down Expand Up @@ -849,6 +854,7 @@ def h2i(self, pkt, x):
if isinstance(x, bytes):
x = plain_str(x) # type: ignore
if isinstance(x, str):
x = NetIP(x)
try:
inet_aton(x)
except socket.error:
Expand Down Expand Up @@ -893,6 +899,8 @@ def any2i(self, pkt, x):

def i2repr(self, pkt, x):
# type: (Optional[Packet], Union[str, Net]) -> str
if isinstance(x, _NetIP):
return repr(x)
r = self.resolve(self.i2h(pkt, x))
return r if isinstance(r, str) else repr(r)

Expand All @@ -902,29 +910,16 @@ def randval(self):


class SourceIPField(IPField):
__slots__ = ["dstname"]

def __init__(self, name, dstname):
# type: (str, Optional[str]) -> None
def __init__(self, name):
# type: (str) -> None
IPField.__init__(self, name, None)
self.dstname = dstname

def __findaddr(self, pkt):
# type: (Packet) -> str
# type: (Packet) -> Optional[str]
if conf.route is None:
# unused import, only to initialize conf.route
import scapy.route # noqa: F401
dst = ("0.0.0.0" if self.dstname is None
else getattr(pkt, self.dstname) or "0.0.0.0")
if isinstance(dst, (Gen, list)):
r = {
conf.route.route(str(daddr))
for daddr in dst
} # type: Set[Tuple[str, str, str]]
if len(r) > 1:
warning("More than one possible route for %r" % (dst,))
return min(r)[1]
return conf.route.route(dst)[1]
return pkt.route()[1]

def i2m(self, pkt, x):
# type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
Expand All @@ -945,18 +940,19 @@ def __init__(self, name, default):
Field.__init__(self, name, default, "16s")

def h2i(self, pkt, x):
# type: (Optional[Packet], Optional[str]) -> str
# type: (Optional[Packet], Any) -> str
if isinstance(x, bytes):
x = plain_str(x)
if isinstance(x, str):
x = NetIP(x)
try:
x = in6_ptop(x)
in6_ptop(x)
except socket.error:
return Net6(x) # type: ignore
elif isinstance(x, tuple):
if len(x) != 2:
raise ValueError("Invalid IPv6 format")
return Net6(*x)
return Net6(*x) # type: ignore
elif isinstance(x, list):
x = [self.h2i(pkt, n) for n in x]
return x # type: ignore
Expand Down Expand Up @@ -990,6 +986,8 @@ def i2repr(self, pkt, x):
elif in6_isaddr6to4(x): # print encapsulated address
vaddr = in6_6to4ExtractAddr(x)
return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr)
elif isinstance(x, _NetIP):
return repr(x)
r = self.i2h(pkt, x) # No specific information to return
return r if isinstance(r, str) else repr(r)

Expand All @@ -999,36 +997,27 @@ def randval(self):


class SourceIP6Field(IP6Field):
__slots__ = ["dstname"]

def __init__(self, name, dstname):
# type: (str, str) -> None
def __init__(self, name):
# type: (str) -> None
IP6Field.__init__(self, name, None)
self.dstname = dstname

def __findaddr(self, pkt):
# type: (Packet) -> Optional[str]
if conf.route6 is None:
# unused import, only to initialize conf.route
import scapy.route6 # noqa: F401
return pkt.route()[1]

def i2m(self, pkt, x):
# type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
if x is None:
dst = ("::" if self.dstname is None else
getattr(pkt, self.dstname) or "::")
iff, x, nh = conf.route6.route(dst)
if x is None and pkt is not None:
x = self.__findaddr(pkt)
return super(SourceIP6Field, self).i2m(pkt, x)

def i2h(self, pkt, x):
# type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
if x is None:
if conf.route6 is None:
# unused import, only to initialize conf.route6
import scapy.route6 # noqa: F401
dst = ("::" if self.dstname is None else getattr(pkt, self.dstname)) # noqa: E501
if isinstance(dst, (Gen, list)):
r = {conf.route6.route(str(daddr))
for daddr in dst}
if len(r) > 1:
warning("More than one possible route for %r" % (dst,))
x = min(r)[1]
else:
x = conf.route6.route(dst)[1]
if x is None and pkt is not None:
x = self.__findaddr(pkt)
return super(SourceIP6Field, self).i2h(pkt, x)


Expand Down
2 changes: 1 addition & 1 deletion scapy/layers/hsrp.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class HSRPmd5(Packet):
ByteEnumField("algo", 0, {1: "MD5"}),
ByteField("padding", 0x00),
XShortField("flags", 0x00),
SourceIPField("sourceip", None),
SourceIPField("sourceip"),
XIntField("keyid", 0x00),
StrFixedLenField("authdigest", b"\00" * 16, 16)]

Expand Down
Loading

0 comments on commit 17cad47

Please sign in to comment.