Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Fix Memory Leak on Network Drop + Use Asyncio Streams #29904

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
WEBSOCKET_PORT,
AMQP_WS_SUBPROTOCOL,
TIMEOUT_INTERVAL,
READ_TIMEOUT_INTERVAL,
)
from .._transport import (
AMQP_FRAME,
Expand All @@ -63,7 +62,6 @@
DEFAULT_SOCKET_SETTINGS,
SIGNED_INT_MAX,
_UNAVAIL,
set_cloexec,
AMQP_PORT,
)
from ..error import AuthenticationException, ErrorCondition
Expand Down Expand Up @@ -262,93 +260,41 @@ async def connect(self):
# are we already connected?
if self.connected:
return
await self._connect(self.host, self.port, self.connect_timeout)
self._init_socket(self.socket_settings)
try:
# Building ssl opts here instead of constructor, so that invalid cert error is raised
# when client is connecting, rather then during creation. For uamqp exception parity.
self.sslopts = self._build_ssl_opts(self.sslopts)
except FileNotFoundError as exc:
# FileNotFoundError does not have missing filename info, so adding it below.
# Assuming that this must be ca_certs, since this is the only file path that
# users can pass in (`connection_verify` in the EH/SB clients) through sslopts above.
# For uamqp exception parity. Remove later when resolving issue #27128.
exc.filename = self.sslopts
raise exc
self.reader, self.writer = await asyncio.open_connection(
sock=self.sock,
host=self.host,
port=self.port,
ssl=self.sslopts,
family=socket.AF_UNSPEC,
proto=SOL_TCP,
server_hostname=self.host if self.sslopts else None,
)
# we've sent the banner; signal connect
# EINTR, EAGAIN, EWOULDBLOCK would signal that the banner
# has _not_ been sent
self.connected = True
sock = self.writer.transport.get_extra_info("socket")
if sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
Copy link
Member Author

@kashifkhan kashifkhan May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to bring in practically all of the of the keep alive & other socket properties from previous time. These largely apply to Linux & Windows. MacOS doesnt have a majority of these OS level props and therefore ignored. The timeout was not brought over as that makes sockets blocking, which is a no go for async. The idea is to use wait_for and the timeout param in there to act as the "timeout", which is what we do :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we're happy that get_extra_info("socket") is supported public API? Looks that way, but just confirming (anyway, given that it's Python stdlib, even if it's changed in Python vNext, we'd at least have some warning).

Is there any potential for these calls to generate additional exceptions that might need to be handled on line 289?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is supported in the public API for all the transports that are supported by streams. I cant believe I missed this detail when I first implemented it. Their example itself shows how to get the underlying socket :)

Looking around I came around its usage in aiohttp where they also set up the KEEP_ALIVE in the same way https://github.com/aio-libs/aiohttp/blob/ffad0878989fc1ea95304406abbf421d627fe366/aiohttp/tcp_helpers.py#L13.

With that I was fairly confident on using this API to set socket properties, its public and to your point we should have some warning if its going away.

On line 289, with these options set, any errors raised will be between the SSLError and OSError. So these will be sufficient :)

self._set_socket_options(sock, self.socket_settings)


except (OSError, IOError, SSLError) as e:
_LOGGER.info("Transport connect failed: %r", e, extra=self.network_trace_params)
# if not fully connected, close socket, and reraise error
if self.sock and not self.connected:
self.sock.close()
self.sock = None
if self.writer and not self.connected:
self.writer.close()
await self.writer.wait_closed()
self.connected = False
raise

async def _connect(self, host, port, timeout):
# Below we are trying to avoid additional DNS requests for AAAA if A
# succeeds. This helps a lot in case when a hostname has an IPv4 entry
# in /etc/hosts but not IPv6. Without the (arguably somewhat twisted)
# logic below, getaddrinfo would attempt to resolve the hostname for
# both IP versions, which would make the resolver talk to configured
# DNS servers. If those servers are for some reason not available
# during resolution attempt (either because of system misconfiguration,
# or network connectivity problem), resolution process locks the
# _connect call for extended time.
e = None
addr_types = (socket.AF_INET, socket.AF_INET6)
addr_types_num = len(addr_types)
for n, family in enumerate(addr_types):
# first, resolve the address for a single address family
try:
entries = await asyncio.get_event_loop().getaddrinfo(
host, port, family=family, type=socket.SOCK_STREAM, proto=SOL_TCP
)
entries_num = len(entries)
except socket.gaierror:
# we may have depleted all our options
if n + 1 >= addr_types_num:
# if getaddrinfo succeeded before for another address
# family, reraise the previous socket.error since it's more
# relevant to users
raise e if e is not None else socket.error("failed to resolve broker hostname")
continue # pragma: no cover
# now that we have address(es) for the hostname, connect to broker
for i, res in enumerate(entries):
af, socktype, proto, _, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
try:
set_cloexec(self.sock, True)
except NotImplementedError:
pass
self.sock.settimeout(timeout)
await asyncio.get_event_loop().sock_connect(self.sock, sa)
except socket.error as ex:
e = ex
if self.sock is not None:
self.sock.close()
self.sock = None
# we may have depleted all our options
if i + 1 >= entries_num and n + 1 >= addr_types_num:
raise
else:
# hurray, we established connection
return

def _init_socket(self, socket_settings):
self.sock.settimeout(None) # set socket back to blocking mode
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._set_socket_options(socket_settings)
try:
# Building ssl opts here instead of constructor, so that invalid cert error is raised
# when client is connecting, rather then during creation. For uamqp exception parity.
self.sslopts = self._build_ssl_opts(self.sslopts)
except FileNotFoundError as exc:
# FileNotFoundError does not have missing filename info, so adding it below.
# Assuming that this must be ca_certs, since this is the only file path that
# users can pass in (`connection_verify` in the EH/SB clients) through sslopts above.
# For uamqp exception parity. Remove later when resolving issue #27128.
exc.filename = self.sslopts
raise exc
self.sock.settimeout(READ_TIMEOUT_INTERVAL) # set socket back to non-blocking mode

def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use
tcp_opts = {}
for opt in KNOWN_TCP_OPTS:
Expand All @@ -369,12 +315,12 @@ def _get_tcp_socket_defaults(self, sock): # pylint: disable=no-self-use
tcp_opts[enum] = sock.getsockopt(SOL_TCP, getattr(socket, opt))
return tcp_opts

def _set_socket_options(self, socket_settings):
tcp_opts = self._get_tcp_socket_defaults(self.sock)
def _set_socket_options(self, sock, socket_settings):
tcp_opts = self._get_tcp_socket_defaults(sock)
if socket_settings:
tcp_opts.update(socket_settings)
for opt, val in tcp_opts.items():
self.sock.setsockopt(SOL_TCP, opt, val)
sock.setsockopt(SOL_TCP, opt, val)

async def _read(
self,
Expand Down Expand Up @@ -411,6 +357,8 @@ async def _read(
# http://bugs.python.org/issue10272
if isinstance(exc, SSLError) and "timed out" in str(exc):
raise socket.timeout()
if exc.errno in [110]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's errno 110? Should we add a comment for the magic number?

Copy link
Member Author

@kashifkhan kashifkhan May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errno 110 is equivalent to ETIMEDOUT on linux non blocking sockets, where a keep alive is set, and is set when the connection goes down. This behavior is linux specific and only on async. sync Linux & async/sync Windows & Mac raised ConnectionAborted or `ConnectionReset' errors which properly end up in a retry loop.

Ill make this in a comment in the code :)

raise ConnectionAbortedError('The connection was closed abruptly.')
# ssl.sock.read may cause ENOENT if the
# operation couldn't be performed (Issue celery#1414).
if exc.errno in _errnos:
Expand Down Expand Up @@ -451,7 +399,6 @@ async def close(self):
# Sometimes SSL raises APPLICATION_DATA_AFTER_CLOSE_NOTIFY here on close.
_LOGGER.debug("Error shutting down socket: %r", e, extra=self.network_trace_params)
self.writer, self.reader = None, None
self.sock = None
self.connected = False

async def negotiate(self):
Expand Down