From 9e5d6e6fb5c4451592d713985598f301bcf3eb1e Mon Sep 17 00:00:00 2001 From: gpotter2 <10530980+gpotter2@users.noreply.github.com> Date: Sun, 22 Sep 2024 21:51:34 +0200 Subject: [PATCH] Use much better timeout for threading --- scapy/sendrecv.py | 27 ++++++++++++--------------- test/contrib/automotive/doip.uts | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index a8d00b7c961..4f06c19d443 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -92,7 +92,7 @@ class debug: Automatically enabled when a generator is passed as the packet :param _flood: :param threaded: if True, packets are sent in a thread and received in another. - defaults to False. + Defaults to True. :param session: a flow decoder used to handle stream of packets :param chainEX: if True, exceptions during send will be forwarded :param stop_filter: Python function applied to each packet to determine if @@ -158,7 +158,7 @@ def __init__(self, self.noans = 0 self._flood = _flood self.threaded = threaded - self.breakout = False + self.breakout = Event() # Instantiate packet holders if prebuild and not self._flood: self.tobesent = list(pkt) # type: _PacketIterable @@ -174,6 +174,7 @@ def __init__(self, self.timeout = None while retry >= 0: + self.breakout.clear() self.hsent = {} # type: Dict[bytes, List[Packet]] if threaded or self._flood: @@ -190,7 +191,7 @@ def __init__(self, except KeyboardInterrupt as ex: interrupted = ex - self.breakout = True + self.breakout.set() # Ended. Let's close gracefully if self._flood: @@ -264,7 +265,7 @@ def _sndrcv_snd(self): p = None try: if self.verbose: - print("Begin emission:") + os.write(1, b"Begin emission\n") for p in self.tobesent: # Populate the dictionary of _sndrcv_rcv # _sndrcv_rcv won't miss the answer of a packet that @@ -273,11 +274,11 @@ def _sndrcv_snd(self): # Send packet self.pks.send(p) time.sleep(self.inter) - if self.breakout: + if self.breakout.is_set(): break i += 1 if self.verbose: - print("Finished sending %i packets." % i) + os.write(1, b"\nFinished sending %i packets\n" % i) except SystemExit: pass except Exception: @@ -296,16 +297,12 @@ def _sndrcv_snd(self): elif not self._send_done: self.notans = i self._send_done = True - # In threaded mode, timeout. - if self.threaded and self.timeout is not None and not self.breakout: - t = time.monotonic() + self.timeout - while time.monotonic() < t: - if self.breakout: - break - time.sleep(0.1) + self._stop_sniffer_if_done() + # In threaded mode, timeout + if self.threaded and self.timeout is not None and not self.breakout.is_set(): + self.breakout.wait(timeout=self.timeout) if self.sniffer and self.sniffer.running: self.sniffer.stop() - self._stop_sniffer_if_done() def _process_packet(self, r): # type: (Packet) -> None @@ -346,7 +343,7 @@ def _sndrcv_rcv(self, callback): self.sniffer = AsyncSniffer() self.sniffer._run( prn=self._process_packet, - timeout=None if self.threaded else self.timeout, + timeout=None if self.threaded and not self._flood else self.timeout, store=False, opened_socket=self.rcv_pks, session=self.session, diff --git a/test/contrib/automotive/doip.uts b/test/contrib/automotive/doip.uts index b246b542191..d23d558b418 100644 --- a/test/contrib/automotive/doip.uts +++ b/test/contrib/automotive/doip.uts @@ -748,7 +748,7 @@ def server_tcp(): server_tcp_up.set() connection, address = sock.accept() connection.send(buffer) - connection.shutdown() + connection.shutdown(socket.SHUT_RDWR) connection.close() finally: sock.close()