diff --git a/src/run_tribler.py b/src/run_tribler.py index e43b791a464..b0a2a20411f 100644 --- a/src/run_tribler.py +++ b/src/run_tribler.py @@ -11,6 +11,7 @@ from tribler.core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy from tribler.core.sentry_reporter.sentry_scrubber import SentryScrubber +from tribler.core.utilities.asyncio_fixes.finish_accept_patch import apply_finish_accept_patch from tribler.core.utilities.slow_coro_detection.main_thread_stack_tracking import start_main_thread_stack_tracing from tribler.core.utilities.osutils import get_root_state_directory from tribler.core.utilities.utilities import is_frozen @@ -92,6 +93,9 @@ def init_boot_logger(): # Check whether we need to start the core or the user interface if parsed_args.core: + if sys.platform == 'win32': + apply_finish_accept_patch() + from tribler.core.utilities.pony_utils import track_slow_db_sessions track_slow_db_sessions() diff --git a/src/tribler/core/utilities/asyncio_fixes/finish_accept_patch.py b/src/tribler/core/utilities/asyncio_fixes/finish_accept_patch.py new file mode 100644 index 00000000000..501d49d7fbc --- /dev/null +++ b/src/tribler/core/utilities/asyncio_fixes/finish_accept_patch.py @@ -0,0 +1,144 @@ +import socket +import struct +from asyncio import exceptions, tasks, trsock +from asyncio.log import logger + +try: + import _overlapped +except ImportError: + _overlapped = None + + +NULL = 0 +patch_applied = False + + +# pylint: disable=protected-access + + +def apply_finish_accept_patch(): # pragma: no cover + """ + The patch fixes the following issue with the IocpProactor._accept() method on Windows: + + OSError: [WinError 64] The specified network name is no longer available + File "asyncio\windows_events.py", line 571, in accept_coro + await future + File "asyncio\windows_events.py", line 817, in _poll + value = callback(transferred, key, ov) + File "asyncio\windows_events.py", line 560, in finish_accept + ov.getresult() + OSError: [WinError 64] The specified network name is no longer available. + + See: + * https://github.com/Tribler/tribler/issues/7759 + * https://github.com/python/cpython/issues/93821 + """ + + global patch_applied # pylint: disable=global-statement + if patch_applied: + return + + # pylint: disable=import-outside-toplevel + from asyncio.proactor_events import BaseProactorEventLoop + from asyncio.windows_events import IocpProactor + + BaseProactorEventLoop._start_serving = patched_proactor_event_loop_start_serving + IocpProactor.accept = patched_iocp_proacor_accept + + patch_applied = True + logger.info("Patched asyncio to fix accept() issues on Windows") + + +def patched_iocp_proacor_accept(self, listener, *, _overlapped=_overlapped): + self._register_with_iocp(listener) + conn = self._get_accept_socket(listener.family) + ov = _overlapped.Overlapped(NULL) + ov.AcceptEx(listener.fileno(), conn.fileno()) + + def finish_accept(trans, key, ov): # pylint: disable=unused-argument + # ov.getresult() + # start of the patched code + try: + ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + logger.debug("Connection reset error occurred, continuing to accept connections") + conn.close() + raise ConnectionResetError(*exc.args) from exc + raise + # end of the patched code + + # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. + buf = struct.pack('@P', listener.fileno()) + conn.setsockopt(socket.SOL_SOCKET, + _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) + conn.settimeout(listener.gettimeout()) + return conn, conn.getpeername() + + async def accept_coro(future, conn): + # Coroutine closing the accept socket if the future is cancelled + try: + await future + except exceptions.CancelledError: + conn.close() + raise + + future = self._register(ov, listener, finish_accept) + coro = accept_coro(future, conn) + tasks.ensure_future(coro, loop=self._loop) + return future + + +def patched_proactor_event_loop_start_serving(self, protocol_factory, sock, + sslcontext=None, server=None, backlog=100, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): # pragma: no cover + # pylint: disable=unused-argument + + def loop(f=None): + try: + if f is not None: + conn, addr = f.result() + if self._debug: + logger.debug("%r got a new connection from %r: %r", + server, addr, conn) + protocol = protocol_factory() + if sslcontext is not None: + self._make_ssl_transport( + conn, protocol, sslcontext, server_side=True, + extra={'peername': addr}, server=server, + ssl_handshake_timeout=ssl_handshake_timeout, + ssl_shutdown_timeout=ssl_shutdown_timeout) + else: + self._make_socket_transport( + conn, protocol, + extra={'peername': addr}, server=server) + if self.is_closed(): + return + f = self._proactor.accept(sock) + + # start of the patched code + except ConnectionResetError: + logger.debug("Connection reset error occurred, continuing to accept connections") + self.call_soon(loop) + # end of the patched code + + except OSError as exc: + if sock.fileno() != -1: + self.call_exception_handler({ + 'message': 'Accept failed on a socket', + 'exception': exc, + 'socket': trsock.TransportSocket(sock), + }) + sock.close() + elif self._debug: + logger.debug("Accept failed on socket %r", + sock, exc_info=True) + except exceptions.CancelledError: + sock.close() + else: + self._accept_futures[sock.fileno()] = f + f.add_done_callback(loop) + + self.call_soon(loop) diff --git a/src/tribler/core/utilities/asyncio_fixes/tests/test_finish_accept_patch.py b/src/tribler/core/utilities/asyncio_fixes/tests/test_finish_accept_patch.py new file mode 100644 index 00000000000..042bacd254f --- /dev/null +++ b/src/tribler/core/utilities/asyncio_fixes/tests/test_finish_accept_patch.py @@ -0,0 +1,107 @@ +from asyncio import exceptions +from dataclasses import dataclass +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from tribler.core.utilities.asyncio_fixes.finish_accept_patch import patched_iocp_proacor_accept + + +# pylint: disable=protected-access + + +@dataclass +class accept_mocks_dataclass: + proactor: MagicMock + future: AsyncMock + conn: MagicMock + listener: MagicMock + overlapped: MagicMock + + +@pytest.fixture(name='accept_mocks') +def accept_mocks_fixture(): + proactor = MagicMock() + + future = AsyncMock(side_effect=exceptions.CancelledError)() + proactor._register.return_value = future + + conn = MagicMock() + proactor._get_accept_socket.return_value = conn + + listener = MagicMock() + overlapped = MagicMock() + + return accept_mocks_dataclass(proactor, future, conn, listener, overlapped) + + +async def test_accept_coro(accept_mocks): + + with patch('asyncio.tasks.ensure_future') as ensure_future_mock: + f = patched_iocp_proacor_accept(accept_mocks.proactor, accept_mocks.listener, + _overlapped=accept_mocks.overlapped) + assert f is accept_mocks.future + + ensure_future_mock.assert_called_once() + coro = ensure_future_mock.call_args[0][0] + + assert not accept_mocks.conn.close.called + + with pytest.raises(exceptions.CancelledError): + await coro + + assert accept_mocks.conn.close.called + + finish_accept = accept_mocks.proactor._register.call_args[0][2] + finish_accept(None, None, accept_mocks.overlapped) + + assert accept_mocks.overlapped.getresult.called + assert accept_mocks.conn.getpeername.called + + +async def test_finish_accept_error_netname_deleted(accept_mocks): + with patch('asyncio.tasks.ensure_future') as ensure_future_mock: + patched_iocp_proacor_accept(accept_mocks.proactor, accept_mocks.listener, + _overlapped=accept_mocks.overlapped) + finish_accept = accept_mocks.proactor._register.call_args[0][2] + + # to avoid RuntimeWarning "coroutine 'accept_coro' was never awaited + coro = ensure_future_mock.call_args[0][0] + with pytest.raises(exceptions.CancelledError): + await coro + + exc = OSError() + exc.winerror = accept_mocks.overlapped.ERROR_NETNAME_DELETED + accept_mocks.overlapped.getresult.side_effect = exc + + accept_mocks.conn.close.reset_mock() + assert not accept_mocks.conn.close.called + + with pytest.raises(ConnectionResetError): + await finish_accept(None, None, accept_mocks.overlapped) + + assert accept_mocks.conn.close.called + + +async def test_finish_accept_other_os_error(accept_mocks): + with patch('asyncio.tasks.ensure_future') as ensure_future_mock: + patched_iocp_proacor_accept(accept_mocks.proactor, accept_mocks.listener, + _overlapped=accept_mocks.overlapped) + finish_accept = accept_mocks.proactor._register.call_args[0][2] + + # to avoid RuntimeWarning "coroutine 'accept_coro' was never awaited + coro = ensure_future_mock.call_args[0][0] + with pytest.raises(exceptions.CancelledError): + await coro + + exc = OSError() + exc.winerror = MagicMock() + accept_mocks.overlapped.getresult.side_effect = exc + + accept_mocks.conn.close.reset_mock() + assert not accept_mocks.conn.close.called + + with pytest.raises(OSError): + await finish_accept(None, None, accept_mocks.overlapped) + + assert not accept_mocks.conn.close.called