From 0256044bb2b721d9e1c7003ccf36194e59ad3538 Mon Sep 17 00:00:00 2001 From: Kashif Khan <361477+kashifkhan@users.noreply.github.com> Date: Fri, 5 May 2023 12:21:34 -0500 Subject: [PATCH] [ServiceBus] Fix Memory Leak on Network Drop + Use Asyncio Streams (#29904) * use non blocking socket + raise on errno 110 * pylint fixes * comment for errno 110 --- .../_pyamqp/aio/_transport_async.py | 118 ++++++------------ 1 file changed, 35 insertions(+), 83 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_transport_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_transport_async.py index 4241bd02342c..69bd96527e84 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_transport_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_transport_async.py @@ -54,7 +54,6 @@ WEBSOCKET_PORT, AMQP_WS_SUBPROTOCOL, TIMEOUT_INTERVAL, - READ_TIMEOUT_INTERVAL, ) from .._transport import ( AMQP_FRAME, @@ -63,7 +62,6 @@ DEFAULT_SOCKET_SETTINGS, SIGNED_INT_MAX, _UNAVAIL, - set_cloexec, AMQP_PORT, ) from ..error import AuthenticationException, ErrorCondition @@ -262,93 +260,41 @@ async def connect(self): # are we already connected? if self.connected: return - await self._connect(self.host, self.port, self.connect_timeout) - self._init_socket(self.socket_settings) + try: + # Building ssl opts here instead of constructor, so that invalid cert error is raised + # when client is connecting, rather then during creation. For uamqp exception parity. + self.sslopts = self._build_ssl_opts(self.sslopts) + except FileNotFoundError as exc: + # FileNotFoundError does not have missing filename info, so adding it below. + # Assuming that this must be ca_certs, since this is the only file path that + # users can pass in (`connection_verify` in the EH/SB clients) through sslopts above. + # For uamqp exception parity. Remove later when resolving issue #27128. + exc.filename = self.sslopts + raise exc self.reader, self.writer = await asyncio.open_connection( - sock=self.sock, + host=self.host, + port=self.port, ssl=self.sslopts, + family=socket.AF_UNSPEC, + proto=SOL_TCP, server_hostname=self.host if self.sslopts else None, ) - # we've sent the banner; signal connect - # EINTR, EAGAIN, EWOULDBLOCK would signal that the banner - # has _not_ been sent self.connected = True + sock = self.writer.transport.get_extra_info("socket") + if sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self._set_socket_options(sock, self.socket_settings) + + except (OSError, IOError, SSLError) as e: _LOGGER.info("Transport connect failed: %r", e, extra=self.network_trace_params) # if not fully connected, close socket, and reraise error - if self.sock and not self.connected: - self.sock.close() - self.sock = None + if self.writer and not self.connected: + self.writer.close() + await self.writer.wait_closed() + self.connected = False raise - async def _connect(self, host, port, timeout): - # Below we are trying to avoid additional DNS requests for AAAA if A - # succeeds. This helps a lot in case when a hostname has an IPv4 entry - # in /etc/hosts but not IPv6. Without the (arguably somewhat twisted) - # logic below, getaddrinfo would attempt to resolve the hostname for - # both IP versions, which would make the resolver talk to configured - # DNS servers. If those servers are for some reason not available - # during resolution attempt (either because of system misconfiguration, - # or network connectivity problem), resolution process locks the - # _connect call for extended time. - e = None - addr_types = (socket.AF_INET, socket.AF_INET6) - addr_types_num = len(addr_types) - for n, family in enumerate(addr_types): - # first, resolve the address for a single address family - try: - entries = await asyncio.get_event_loop().getaddrinfo( - host, port, family=family, type=socket.SOCK_STREAM, proto=SOL_TCP - ) - entries_num = len(entries) - except socket.gaierror: - # we may have depleted all our options - if n + 1 >= addr_types_num: - # if getaddrinfo succeeded before for another address - # family, reraise the previous socket.error since it's more - # relevant to users - raise e if e is not None else socket.error("failed to resolve broker hostname") - continue # pragma: no cover - # now that we have address(es) for the hostname, connect to broker - for i, res in enumerate(entries): - af, socktype, proto, _, sa = res - try: - self.sock = socket.socket(af, socktype, proto) - try: - set_cloexec(self.sock, True) - except NotImplementedError: - pass - self.sock.settimeout(timeout) - await asyncio.get_event_loop().sock_connect(self.sock, sa) - except socket.error as ex: - e = ex - if self.sock is not None: - self.sock.close() - self.sock = None - # we may have depleted all our options - if i + 1 >= entries_num and n + 1 >= addr_types_num: - raise - else: - # hurray, we established connection - return - - def _init_socket(self, socket_settings): - self.sock.settimeout(None) # set socket back to blocking mode - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self._set_socket_options(socket_settings) - try: - # Building ssl opts here instead of constructor, so that invalid cert error is raised - # when client is connecting, rather then during creation. For uamqp exception parity. - self.sslopts = self._build_ssl_opts(self.sslopts) - except FileNotFoundError as exc: - # FileNotFoundError does not have missing filename info, so adding it below. - # Assuming that this must be ca_certs, since this is the only file path that - # users can pass in (`connection_verify` in the EH/SB clients) through sslopts above. - # For uamqp exception parity. Remove later when resolving issue #27128. - exc.filename = self.sslopts - raise exc - self.sock.settimeout(READ_TIMEOUT_INTERVAL) # set socket back to non-blocking mode - def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use tcp_opts = {} for opt in KNOWN_TCP_OPTS: @@ -369,12 +315,12 @@ def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use tcp_opts[enum] = sock.getsockopt(SOL_TCP, getattr(socket, opt)) return tcp_opts - def _set_socket_options(self, socket_settings): - tcp_opts = self._get_tcp_socket_defaults(self.sock) + def _set_socket_options(self, sock, socket_settings): + tcp_opts = self._get_tcp_socket_defaults(sock) if socket_settings: tcp_opts.update(socket_settings) for opt, val in tcp_opts.items(): - self.sock.setsockopt(SOL_TCP, opt, val) + sock.setsockopt(SOL_TCP, opt, val) async def _read( self, @@ -411,6 +357,13 @@ async def _read( # http://bugs.python.org/issue10272 if isinstance(exc, SSLError) and "timed out" in str(exc): raise socket.timeout() + # errno 110 is equivalent to ETIMEDOUT on linux non blocking sockets, when a keep alive is set, + # and is set when the connection to the server doesnt succeed + # https://man7.org/linux/man-pages/man7/tcp.7.html. + # This behavior is linux specific and only on async. sync Linux & async/sync Windows & Mac raised + # ConnectionAborted or ConnectionReset errors which properly end up in a retry loop. + if exc.errno in [110]: + raise ConnectionAbortedError('The connection was closed abruptly.') # ssl.sock.read may cause ENOENT if the # operation couldn't be performed (Issue celery#1414). if exc.errno in _errnos: @@ -451,7 +404,6 @@ async def close(self): # Sometimes SSL raises APPLICATION_DATA_AFTER_CLOSE_NOTIFY here on close. _LOGGER.debug("Error shutting down socket: %r", e, extra=self.network_trace_params) self.writer, self.reader = None, None - self.sock = None self.connected = False async def negotiate(self):