Skip to content

Commit

Permalink
[ServiceBus] Fix Memory Leak on Network Drop + Use Asyncio Streams (#…
Browse files Browse the repository at this point in the history
…29904)

* use non blocking socket + raise on errno 110

* pylint fixes

* comment for errno 110
  • Loading branch information
kashifkhan authored May 5, 2023
1 parent 170fe74 commit 0256044
Showing 1 changed file with 35 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
WEBSOCKET_PORT,
AMQP_WS_SUBPROTOCOL,
TIMEOUT_INTERVAL,
READ_TIMEOUT_INTERVAL,
)
from .._transport import (
AMQP_FRAME,
Expand All @@ -63,7 +62,6 @@
DEFAULT_SOCKET_SETTINGS,
SIGNED_INT_MAX,
_UNAVAIL,
set_cloexec,
AMQP_PORT,
)
from ..error import AuthenticationException, ErrorCondition
Expand Down Expand Up @@ -262,93 +260,41 @@ async def connect(self):
# are we already connected?
if self.connected:
return
await self._connect(self.host, self.port, self.connect_timeout)
self._init_socket(self.socket_settings)
try:
# Building ssl opts here instead of constructor, so that invalid cert error is raised
# when client is connecting, rather then during creation. For uamqp exception parity.
self.sslopts = self._build_ssl_opts(self.sslopts)
except FileNotFoundError as exc:
# FileNotFoundError does not have missing filename info, so adding it below.
# Assuming that this must be ca_certs, since this is the only file path that
# users can pass in (`connection_verify` in the EH/SB clients) through sslopts above.
# For uamqp exception parity. Remove later when resolving issue #27128.
exc.filename = self.sslopts
raise exc
self.reader, self.writer = await asyncio.open_connection(
sock=self.sock,
host=self.host,
port=self.port,
ssl=self.sslopts,
family=socket.AF_UNSPEC,
proto=SOL_TCP,
server_hostname=self.host if self.sslopts else None,
)
# we've sent the banner; signal connect
# EINTR, EAGAIN, EWOULDBLOCK would signal that the banner
# has _not_ been sent
self.connected = True
sock = self.writer.transport.get_extra_info("socket")
if sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._set_socket_options(sock, self.socket_settings)


except (OSError, IOError, SSLError) as e:
_LOGGER.info("Transport connect failed: %r", e, extra=self.network_trace_params)
# if not fully connected, close socket, and reraise error
if self.sock and not self.connected:
self.sock.close()
self.sock = None
if self.writer and not self.connected:
self.writer.close()
await self.writer.wait_closed()
self.connected = False
raise

async def _connect(self, host, port, timeout):
# Below we are trying to avoid additional DNS requests for AAAA if A
# succeeds. This helps a lot in case when a hostname has an IPv4 entry
# in /etc/hosts but not IPv6. Without the (arguably somewhat twisted)
# logic below, getaddrinfo would attempt to resolve the hostname for
# both IP versions, which would make the resolver talk to configured
# DNS servers. If those servers are for some reason not available
# during resolution attempt (either because of system misconfiguration,
# or network connectivity problem), resolution process locks the
# _connect call for extended time.
e = None
addr_types = (socket.AF_INET, socket.AF_INET6)
addr_types_num = len(addr_types)
for n, family in enumerate(addr_types):
# first, resolve the address for a single address family
try:
entries = await asyncio.get_event_loop().getaddrinfo(
host, port, family=family, type=socket.SOCK_STREAM, proto=SOL_TCP
)
entries_num = len(entries)
except socket.gaierror:
# we may have depleted all our options
if n + 1 >= addr_types_num:
# if getaddrinfo succeeded before for another address
# family, reraise the previous socket.error since it's more
# relevant to users
raise e if e is not None else socket.error("failed to resolve broker hostname")
continue # pragma: no cover
# now that we have address(es) for the hostname, connect to broker
for i, res in enumerate(entries):
af, socktype, proto, _, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
await asyncio.get_event_loop().sock_connect(self.sock, sa)
except socket.error as ex:
e = ex
if self.sock is not None:
self.sock.close()
self.sock = None
# we may have depleted all our options
if i + 1 >= entries_num and n + 1 >= addr_types_num:
raise
else:
# hurray, we established connection
return

def _init_socket(self, socket_settings):
self.sock.settimeout(None) # set socket back to blocking mode
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._set_socket_options(socket_settings)
try:
# Building ssl opts here instead of constructor, so that invalid cert error is raised
# when client is connecting, rather then during creation. For uamqp exception parity.
self.sslopts = self._build_ssl_opts(self.sslopts)
except FileNotFoundError as exc:
# FileNotFoundError does not have missing filename info, so adding it below.
# Assuming that this must be ca_certs, since this is the only file path that
# users can pass in (`connection_verify` in the EH/SB clients) through sslopts above.
# For uamqp exception parity. Remove later when resolving issue #27128.
exc.filename = self.sslopts
raise exc
self.sock.settimeout(READ_TIMEOUT_INTERVAL) # set socket back to non-blocking mode

def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use
tcp_opts = {}
for opt in KNOWN_TCP_OPTS:
Expand All @@ -369,12 +315,12 @@ def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use
tcp_opts[enum] = sock.getsockopt(SOL_TCP, getattr(socket, opt))
return tcp_opts

def _set_socket_options(self, socket_settings):
tcp_opts = self._get_tcp_socket_defaults(self.sock)
def _set_socket_options(self, sock, socket_settings):
tcp_opts = self._get_tcp_socket_defaults(sock)
if socket_settings:
tcp_opts.update(socket_settings)
for opt, val in tcp_opts.items():
self.sock.setsockopt(SOL_TCP, opt, val)
sock.setsockopt(SOL_TCP, opt, val)

async def _read(
self,
Expand Down Expand Up @@ -411,6 +357,13 @@ async def _read(
# http://bugs.python.org/issue10272
if isinstance(exc, SSLError) and "timed out" in str(exc):
raise socket.timeout()
# errno 110 is equivalent to ETIMEDOUT on linux non blocking sockets, when a keep alive is set,
# and is set when the connection to the server doesnt succeed
# https://man7.org/linux/man-pages/man7/tcp.7.html.
# This behavior is linux specific and only on async. sync Linux & async/sync Windows & Mac raised
# ConnectionAborted or ConnectionReset errors which properly end up in a retry loop.
if exc.errno in [110]:
raise ConnectionAbortedError('The connection was closed abruptly.')
# ssl.sock.read may cause ENOENT if the
# operation couldn't be performed (Issue celery#1414).
if exc.errno in _errnos:
Expand Down Expand Up @@ -451,7 +404,6 @@ async def close(self):
# Sometimes SSL raises APPLICATION_DATA_AFTER_CLOSE_NOTIFY here on close.
_LOGGER.debug("Error shutting down socket: %r", e, extra=self.network_trace_params)
self.writer, self.reader = None, None
self.sock = None
self.connected = False

async def negotiate(self):
Expand Down

0 comments on commit 0256044

Please sign in to comment.