From 05dfc470a8f049c4b94f77e6d10c696a200d6c4e Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Sun, 14 Jul 2024 18:26:44 +0200 Subject: [PATCH] fixup --- doc/scapy/usage.rst | 25 ++++++++++++++++++++ scapy/base_classes.py | 23 ++++++++++--------- scapy/fields.py | 13 ++++++----- scapy/layers/inet.py | 4 ++-- scapy/layers/inet6.py | 4 ++-- scapy/layers/l2.py | 4 ++-- scapy/sendrecv.py | 30 ++++++++++++++++++++---- test/linux.uts | 53 ++++++++++++++++++++++++++++++++++++++++--- 8 files changed, 125 insertions(+), 31 deletions(-) diff --git a/doc/scapy/usage.rst b/doc/scapy/usage.rst index 3bb133b3e8a..dcb7bc93c5e 100644 --- a/doc/scapy/usage.rst +++ b/doc/scapy/usage.rst @@ -252,6 +252,31 @@ Now that we know how to manipulate packets. Let's see how to send them. The send Sent 1 packets. +.. _multicast: + +Multicast on layer 3: Scope Indentifiers +---------------------------------------- + +.. index:: + single: Multicast + +.. note:: This feature is only available since Scapy 2.6.0. + +If you try to use multicast addresses (IPv4) or link-local addresses (IPv6), you'll notice that Scapy follows the routing table and takes the first entry. In order to specify which interface to use when looking through the routing table, Scapy supports scope identifiers (similar to RFC6874 but for both IPv6 and IPv4). + +.. code:: python + + >>> conf.checkIPaddr = False # answer IP will be != from the one we requested + # send on interface 'eth0' + >>> sr(IP(dst="224.0.0.1%eth0")/ICMP(), multi=True) + >>> sr(IPv6(dst="ff02::2%eth0")/ICMPv6EchoRequest(), multi=True) + +You can use both ``%eth0`` format or ``%15`` (the interface id) format. You can query those using ``conf.ifaces``. + +.. note:: + + Behind the scene, calling ``IP(dst="224.0.0.1%eth0")`` creates a ``ScopedIP`` object that contains ``224.0.0.1`` on the scope of the interface ``eth0``. If you are using an interface object (for instance ``conf.iface``), you can also craft that object. For instance:: + >>> pkt = IP(dst=ScopedIP("224.0.0.1", scope=conf.iface))/ICMP() Fuzzing ------- diff --git a/scapy/base_classes.py b/scapy/base_classes.py index 0a22af0ebf7..46ccef23638 100644 --- a/scapy/base_classes.py +++ b/scapy/base_classes.py @@ -109,7 +109,7 @@ def __repr__(self): return "" % self.values -class _NetIP(str): +class _ScopedIP(str): """ An str that also holds extra attributes. """ @@ -119,21 +119,21 @@ def __init__(self, _: str) -> None: self.scope = None def __repr__(self) -> str: - val = super(_NetIP, self).__repr__() + val = super(_ScopedIP, self).__repr__() if self.scope is not None: - return "NetIP(%s, scope=%s)" % (val, repr(self.scope)) + return "ScopedIP(%s, scope=%s)" % (val, repr(self.scope)) return val -def NetIP(net: str, scope: Optional[Any] = None) -> _NetIP: +def ScopedIP(net: str, scope: Optional[Any] = None) -> _ScopedIP: """ An str that also holds extra attributes. Examples:: - >>> NetIP("224.0.0.1%eth0") # interface 'eth0' - >>> NetIP("224.0.0.1%1") # interface index 1 - >>> NetIP("224.0.0.1", scope=conf.iface) + >>> ScopedIP("224.0.0.1%eth0") # interface 'eth0' + >>> ScopedIP("224.0.0.1%1") # interface index 1 + >>> ScopedIP("224.0.0.1", scope=conf.iface) """ if "%" in net: try: @@ -152,7 +152,7 @@ def NetIP(net: str, scope: Optional[Any] = None) -> _NetIP: "valid interface !" % scope ) scope = network_name(iface) - x = _NetIP(net) + x = _ScopedIP(net) x.scope = scope return x @@ -217,7 +217,8 @@ def __init__(self, net, stop=None, scope=None): self.__class__.__name__) self.scope = None if "%" in net: - net = NetIP(net) + net = ScopedIP(net) + if isinstance(net, _ScopedIP): self.scope = net.scope if stop is None: try: @@ -245,7 +246,7 @@ def __iter__(self): # type: () -> Iterator[str] # Python 2 won't handle huge (> sys.maxint) values in range() for i in range(self.count): - yield NetIP( + yield ScopedIP( self.int2ip(self.start + i), scope=self.scope, ) @@ -261,7 +262,7 @@ def __iterlen__(self): def choice(self): # type: () -> str - return NetIP( + return ScopedIP( self.int2ip(random.randint(self.start, self.stop)), scope=self.scope, ) diff --git a/scapy/fields.py b/scapy/fields.py index 58abf47a28c..d236fb38cfe 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -38,11 +38,11 @@ from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \ in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo from scapy.base_classes import ( - _NetIP, + _ScopedIP, BasePacket, Field_metaclass, Net, - NetIP, + ScopedIP, ) # Typing imports @@ -854,7 +854,7 @@ def h2i(self, pkt, x): if isinstance(x, bytes): x = plain_str(x) # type: ignore if isinstance(x, str): - x = NetIP(x) + x = ScopedIP(x) try: inet_aton(x) except socket.error: @@ -899,7 +899,7 @@ def any2i(self, pkt, x): def i2repr(self, pkt, x): # type: (Optional[Packet], Union[str, Net]) -> str - if isinstance(x, _NetIP) and x.scope: + if isinstance(x, _ScopedIP) and x.scope: return repr(x) r = self.resolve(self.i2h(pkt, x)) return r if isinstance(r, str) else repr(r) @@ -944,8 +944,9 @@ def h2i(self, pkt, x): if isinstance(x, bytes): x = plain_str(x) if isinstance(x, str): + x = ScopedIP(x) try: - x = NetIP(in6_ptop(x)) + x = ScopedIP(in6_ptop(x), scope=x.scope) except socket.error: return Net6(x) # type: ignore elif isinstance(x, tuple): @@ -985,7 +986,7 @@ def i2repr(self, pkt, x): elif in6_isaddr6to4(x): # print encapsulated address vaddr = in6_6to4ExtractAddr(x) return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr) - elif isinstance(x, _NetIP) and x.scope: + elif isinstance(x, _ScopedIP) and x.scope: return repr(x) r = self.i2h(pkt, x) # No specific information to return return r if isinstance(r, str) else repr(r) diff --git a/scapy/layers/inet.py b/scapy/layers/inet.py index 94c93f3bf67..a361664a681 100644 --- a/scapy/layers/inet.py +++ b/scapy/layers/inet.py @@ -18,7 +18,7 @@ from scapy.utils import checksum, do_graph, incremental_label, \ linehexdump, strxor, whois, colgen from scapy.ansmachine import AnsweringMachine -from scapy.base_classes import Gen, Net, _NetIP +from scapy.base_classes import Gen, Net, _ScopedIP from scapy.data import ETH_P_IP, ETH_P_ALL, DLT_RAW, DLT_RAW_ALT, DLT_IPV4, \ IP_PROTOS, TCP_SERVICES, UDP_SERVICES from scapy.layers.l2 import ( @@ -570,7 +570,7 @@ def extract_padding(self, s): def route(self): dst = self.dst scope = None - if isinstance(dst, (Net, _NetIP)): + if isinstance(dst, (Net, _ScopedIP)): scope = dst.scope if isinstance(dst, (Gen, list)): dst = next(iter(dst)) diff --git a/scapy/layers/inet6.py b/scapy/layers/inet6.py index 324ce038016..2633ab77c21 100644 --- a/scapy/layers/inet6.py +++ b/scapy/layers/inet6.py @@ -20,7 +20,7 @@ from scapy.arch import get_if_hwaddr from scapy.as_resolvers import AS_resolver_riswhois -from scapy.base_classes import Gen, _NetIP +from scapy.base_classes import Gen, _ScopedIP from scapy.compat import chb, orb, raw, plain_str, bytes_encode from scapy.consts import WINDOWS from scapy.config import conf @@ -319,7 +319,7 @@ def route(self): """Used to select the L2 address""" dst = self.dst scope = None - if isinstance(dst, (Net6, _NetIP)): + if isinstance(dst, (Net6, _ScopedIP)): scope = dst.scope if isinstance(dst, (Gen, list)): dst = next(iter(dst)) diff --git a/scapy/layers/l2.py b/scapy/layers/l2.py index 9fb7ed889c9..a8a6b303265 100644 --- a/scapy/layers/l2.py +++ b/scapy/layers/l2.py @@ -14,7 +14,7 @@ from scapy.ansmachine import AnsweringMachine from scapy.arch import get_if_addr, get_if_hwaddr -from scapy.base_classes import Gen, Net, _NetIP +from scapy.base_classes import Gen, Net, _ScopedIP from scapy.compat import chb from scapy.config import conf from scapy import consts @@ -564,7 +564,7 @@ def route(self): self.getfield_and_val("pdst")) fld_inner, dst = fld._find_fld_pkt_val(self, dst) scope = None - if isinstance(dst, (Net, _NetIP)): + if isinstance(dst, (Net, _ScopedIP)): scope = dst.scope if isinstance(dst, Gen): dst = next(iter(dst)) diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index ba81ff10ca0..5388cd97dd8 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -475,7 +475,11 @@ def send(x, # type: _PacketIterable """ if "iface" in kargs: # Warn that it isn't used. - warnings.warn("'iface' has no effect on L3 I/O send().", SyntaxWarning) + warnings.warn( + "'iface' has no effect on L3 I/O send(). For multicast/link-local " + "see https://scapy.readthedocs.io/en/latest/usage.html#multicast", + SyntaxWarning, + ) del kargs["iface"] iface, ipv6 = _interface_selection(x) return _send( @@ -691,7 +695,11 @@ def sr(x, # type: _PacketIterable """ if "iface" in kargs: # Warn that it isn't used. - warnings.warn("'iface' has no effect on L3 I/O sr().", SyntaxWarning) + warnings.warn( + "'iface' has no effect on L3 I/O sr(). For multicast/link-local " + "see https://scapy.readthedocs.io/en/latest/usage.html#multicast", + SyntaxWarning, + ) del kargs["iface"] iface, ipv6 = _interface_selection(x) s = iface.l3socket(ipv6)( @@ -714,7 +722,11 @@ def sr1(*args, **kargs): """ if "iface" in kargs: # Warn that it isn't used. - warnings.warn("'iface' has no effect on L3 I/O sr1().", SyntaxWarning) + warnings.warn( + "'iface' has no effect on L3 I/O sr1(). For multicast/link-local " + "see https://scapy.readthedocs.io/en/latest/usage.html#multicast", + SyntaxWarning, + ) del kargs["iface"] ans, _ = sr(*args, **kargs) if ans: @@ -949,7 +961,11 @@ def srflood(x, # type: _PacketIterable """ if "iface" in kargs: # Warn that it isn't used. - warnings.warn("'iface' has no effect on L3 I/O srflood().", SyntaxWarning) + warnings.warn( + "'iface' has no effect on L3 I/O srflood(). For multicast/link-local " + "see https://scapy.readthedocs.io/en/latest/usage.html#multicast", + SyntaxWarning, + ) del kargs["iface"] iface, ipv6 = _interface_selection(x) s = iface.l3socket(ipv6)( @@ -983,7 +999,11 @@ def sr1flood(x, # type: _PacketIterable """ if "iface" in kargs: # Warn that it isn't used. - warnings.warn("'iface' has no effect on L3 I/O sr1flood().", SyntaxWarning) + warnings.warn( + "'iface' has no effect on L3 I/O sr1flood(). For multicast/link-local " + "see https://scapy.readthedocs.io/en/latest/usage.html#multicast", + SyntaxWarning, + ) del kargs["iface"] iface, ipv6 = _interface_selection(x) s = iface.l3socket(ipv6)( diff --git a/test/linux.uts b/test/linux.uts index 2a8b17374e6..c5384f91b65 100644 --- a/test/linux.uts +++ b/test/linux.uts @@ -63,6 +63,53 @@ if exit_status == 0: else: assert True + += Test scoped interface addresses +~ linux needs_root + +import os +exit_status = os.system("ip link add name scapy0 type dummy") +exit_status = os.system("ip link add name scapy1 type dummy") +exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0") +exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1") +exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up") +exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up") +assert exit_status == 0 + +conf.ifaces.reload() +conf.route.resync() +conf.route6.resync() + +conf.route6 + +try: + # IPv4 + a = Ether()/IP(dst="224.0.0.1%scapy0") + assert a[Ether].src == "00:01:02:03:04:05" + assert a[IP].src == "192.0.2.1" + b = Ether()/IP(dst="224.0.0.1%scapy1") + assert b[Ether].src == "06:07:08:09:10:11" + assert b[IP].src == "192.0.3.1" + c = Ether()/IP(dst="224.0.0.1/24%scapy1") + assert c[Ether].src == "06:07:08:09:10:11" + assert c[IP].src == "192.0.3.1" + # IPv6 + a = Ether()/IPv6(dst="ff02::fb%scapy0") + assert a[Ether].src == "00:01:02:03:04:05" + assert a[IPv6].src == "fe80::201:2ff:fe03:405" + b = Ether()/IPv6(dst="ff02::fb%scapy1") + assert b[Ether].src == "06:07:08:09:10:11" + assert b[IPv6].src == "fe80::407:8ff:fe09:1011" + c = Ether()/IPv6(dst="ff02::fb/30%scapy1") + assert c[Ether].src == "06:07:08:09:10:11" + assert c[IPv6].src == "fe80::407:8ff:fe09:1011" +finally: + exit_status = os.system("ip link del scapy0") + exit_status = os.system("ip link del scapy1") + conf.ifaces.reload() + conf.route.resync() + conf.route6.resync() + = catch loopback device missing ~ linux needs_root @@ -309,7 +356,7 @@ assert test_L3PacketSocket_sendto_python3() import os from scapy.sendrecv import _interface_selection -assert _interface_selection(None, IP(dst="8.8.8.8")/UDP()) == (conf.iface, False) +assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False) exit_status = os.system("ip link add name scapy0 type dummy") exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") exit_status = os.system("ip addr add fc00::/24 dev scapy0") @@ -317,8 +364,8 @@ exit_status = os.system("ip link set scapy0 up") conf.ifaces.reload() conf.route.resync() conf.route6.resync() -assert _interface_selection(None, IP(dst="192.0.2.42")/UDP()) == ("scapy0", False) -assert _interface_selection(None, IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True) +assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False) +assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True) exit_status = os.system("ip link del name dev scapy0") conf.ifaces.reload() conf.route.resync()