-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ServiceBus] Fix Memory Leak on Network Drop + Use Asyncio Streams #29904
[ServiceBus] Fix Memory Leak on Network Drop + Use Asyncio Streams #29904
Conversation
API change check APIView has identified API level changes in this PR and created following API reviews. |
/azp run python - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run python - servicebus - ci |
Azure Pipelines successfully started running 1 pipeline(s). |
self.connected = True | ||
sock = self.writer.transport.get_extra_info("socket") | ||
if sock: | ||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to bring in practically all of the of the keep alive & other socket properties from previous time. These largely apply to Linux & Windows. MacOS doesnt have a majority of these OS level props and therefore ignored. The timeout was not brought over as that makes sockets blocking, which is a no go for async. The idea is to use wait_for
and the timeout param in there to act as the "timeout", which is what we do :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we're happy that get_extra_info("socket")
is supported public API? Looks that way, but just confirming (anyway, given that it's Python stdlib, even if it's changed in Python vNext, we'd at least have some warning).
Is there any potential for these calls to generate additional exceptions that might need to be handled on line 289?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that is supported in the public API for all the transports that are supported by streams. I cant believe I missed this detail when I first implemented it. Their example itself shows how to get the underlying socket :)
Looking around I came around its usage in aiohttp where they also set up the KEEP_ALIVE in the same way https://github.com/aio-libs/aiohttp/blob/ffad0878989fc1ea95304406abbf421d627fe366/aiohttp/tcp_helpers.py#L13.
With that I was fairly confident on using this API to set socket properties, its public and to your point we should have some warning if its going away.
On line 289, with these options set, any errors raised will be between the SSLError
and OSError
. So these will be sufficient :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions, but looks okay to me :)
Most of the socket config we're not really exposing to SDK customers, so if we spin this out to standalone library, we can take stock of whether we actually want all those options.
self.connected = True | ||
sock = self.writer.transport.get_extra_info("socket") | ||
if sock: | ||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we're happy that get_extra_info("socket")
is supported public API? Looks that way, but just confirming (anyway, given that it's Python stdlib, even if it's changed in Python vNext, we'd at least have some warning).
Is there any potential for these calls to generate additional exceptions that might need to be handled on line 289?
@@ -411,6 +357,8 @@ async def _read( | |||
# http://bugs.python.org/issue10272 | |||
if isinstance(exc, SSLError) and "timed out" in str(exc): | |||
raise socket.timeout() | |||
if exc.errno in [110]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's errno 110
? Should we add a comment for the magic number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errno 110 is equivalent to ETIMEDOUT
on linux non blocking sockets, where a keep alive is set, and is set when the connection goes down. This behavior is linux specific and only on async. sync Linux & async/sync Windows & Mac raised ConnectionAborted
or `ConnectionReset' errors which properly end up in a retry loop.
Ill make this in a comment in the code :)
/azp run python - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
…zure#29904) * use non blocking socket + raise on errno 110 * pylint fixes * comment for errno 110
* upstream + sb pyamqp * pyamqp from eventhub * asyncio markers on async tests * remove recordings * updating sender and session from main * missing req * [Service Bus] Performance Tests (#28399) * delete t1 tests * perf tests * fix size * few small fixes for topic * perf tests redone * stream tests * update logic to handle recv & del * restructure classes * formatting * preload only delta messages * fixes * rename test files * swathis comments * perf bicep * fix transport & remove shared * if peeklock, then complete * make batch receive batchperftests * restructure classes * fix * fixes for max_count * fix wording * minor clean up * move add args to mixin * [ServiceBus] Iterator Support (#28558) * iterator * add todo * remove logger * tests * skip serialization and pyamp transport errors * add in keep-alive for releasing messages * try this for client.py * trying to fix iterator vs normal receiving timeout for releasing messages * remove part of test failing for sync release -- doesnt work on pyamqp * make message yeilding a while loop * copying changes to async * fixing async release test * message_received was set incorrectly/ diff than uamqp * fix async * fixing closing order logic * receive_contxt, fix test * another receive_context * asyncio.lock * receiver_context * try to ignore sync for now * async with lock * receive_context * remove print statements * unskip sync * fix pylint and mypy * skip tests except for failing one * mark not mock * socket read timeout set to 1 but was .2 on sync * run all tests with new timeout * pr comments * pr comments * typing add back * remove todo * time to live - ttl * async init * async init * pr comments * Revert "pr comments" This reverts commit f8c96f2. * pr comments - lock rename * pr comments - remove timeout setting for uamqp * tests * set link credit in connection listen on keep alive * link async * change pop to get * try flow before connection * test * link credit not bein kept bc of flow in client_run * if link credit is 0, reset it * add wait_time to tests * time override test * Connection to Link Error * sleep fix * link_credit * missing _ in test * remove boolean flag * need actibvity timestamp in yeild message * missing "_" * stamp ->timestamp * dont fix EH here * formatting pylint * sock timeout async * whitespace * remove __aiter__ * need iter * remove todo * todo * whitespace * iter_context * remove self * tests - remove #pytest.skip * add timeouts to constants * merge together if statements * timeout * missing if * pylint * pylint * whitespace * todo * if statement :) * pylint/remove whitespace * [ServiceBus] merge EH pyamqp into SB pyamqp (#29223) * [TEMPORARY] adding eh _pyamqp folder * [TEMP] add _pyamqp/aio * undo removing client lock * lint * [ServiceBus] update pyproto b1 version/changelog/readme (#29251) * [ServiceBus] update changelog 7.9.0b1 (#29267) * Increment version for servicebus releases (#29268) Increment package version after release of azure-servicebus * [ServiceBus] Pyamqp Changes from EH in SB (#29499) * frame fix sync * frame fix async * [ServiceBus] uamqp/pyamqp switch (#28512) - Added _pyamqp_transport.py/_uamqp_transport.py, which contain all corresponding uamqp/pyamqp code. TODO: - tests: - [ ] manually create ServiceBusMessageBatch and set client to `uamqp_transport=True` when sending. * [ServiceBus] Files for SB Perf Tests (#29503) * files for perf test CI * fix args, vals based on comments * remove unused params * add batch size back in to perf test * add in add_arguments in to send base * adjust message size * fix * [Service Bus] Fix System.Byte[] Not Supported (#29670) * add in string decode for 161 * app keys are now strings * app keys are now strings * remove decode * revert changes * decode if app props val is bytes * remove change from pyamqp layer * move fix in to outgoing message * remove extra ) * fix * mypy and pylint * fix in back_compat * rephrase * [ServiceBus] Fix sb perf test (#29765) * fixes for perf bicep * fix env vars and params * [ServiceBus] prep release 7.10.0b1 (#29815) * prep for release * update readmes * update docs for switch * remove uamqp dev req temporarily * fix mypy/pylint * add back uamqp to dev reqs * merge main in topyamqp * update release date * Increment package version after release of azure-servicebus (#29881) * [SB] Remove references to internal streaming method (#29750) * remove streaming * update stress * move uamqp transport imports into client constructor (#29921) * [ServiceBus][Perf] Fix perf tests (#30004) Some async tests were trying to use an synchronous receiver/sender. This fixes that so that the tests can run. Signed-off-by: Paul Van Eck <[email protected]> * [SB Pyamqp] stress updates (#29783) * stress updates * changes * add memray to stress * undo docker file changes * add memray chaos * timeoutError raise * devred * try log to file * test indiv * test indv * updates * tests * logging_enable * stress * update * delete * remove changes to code * change level * update chart.yaml * update to local running of indv components * updates * remove * update * update test base * remove eh changes * logging * update jpb * update docker * update scenarios * logging --------- Co-authored-by: swathipil <[email protected]> * [ServiceBus] Update tracing (#29995) * [ServiceBus] Update tracing - "Send" span now contains links to message spans. - Receive span is now kind CLIENT instead of CONSUMER. - Added span creation logic for settlement methods. - Attribute names were updated to align with distributed tracing conventions. - Some span named renamed to align with other SDKs. - Receive spans now have more accurate start times. Signed-off-by: Paul Van Eck <[email protected]> * Refactor tracing utils Signed-off-by: Paul Van Eck <[email protected]> * Remove unneeded arg from trace_message Signed-off-by: Paul Van Eck <[email protected]> * update changelog Signed-off-by: Paul Van Eck <[email protected]> * Remove use of `messaging.source.name` This is slated to be removed in favor of `messaging.destination.name` for everything. Here, we maintain use of the legacy attribute name `message_bus.destination`. Signed-off-by: Paul Van Eck <[email protected]> * remove test-resources.bicep from stress --------- Signed-off-by: Paul Van Eck <[email protected]> Co-authored-by: swathipil <[email protected]> * [ServiceBus] Fix Memory Leak on Network Drop + Use Asyncio Streams (#29904) * use non blocking socket + raise on errno 110 * pylint fixes * comment for errno 110 * [ServiceBus] pyamqp exception parity (#30020) * unskip tests * test passing uamqp.TransportType * make sbreceived messages picklable * edge case sb message batch creation test * remove accidental additions * add sb client tests * add invalid custom endpoint tests * update pyamqp invalid custom endpoint error * add test_errors back to folder * add more tests * lint * fix unskipped async test * kashif comments * fix asyncio pickling for <3.11 * lint * unpickle clients * remove receiver/uamqp message from received message pickling * annas comments * update version + changelog * update amqp transport kind check in message * changelog + update to stable * pull main again * update readme/typing * test session set_state None --------- Signed-off-by: Paul Van Eck <[email protected]> Co-authored-by: l0lawrence <[email protected]> Co-authored-by: Kashif Khan <[email protected]> Co-authored-by: Azure SDK Bot <[email protected]> Co-authored-by: Paul Van Eck <[email protected]>
This PR fixes the memory leak that would happen when a network drop occurs in Linux when using async receiver. Unlike windows where a
ConnectionAborted
orConnectionResetError
would show up right away, only aTimeoutError
would be raised on Linux.This complicates matters as a TimeoutError could simply mean the service has no messages/events to hand back. Due to this the code would enter in to a infinite loop raising memory. What should have happened is the error would propagate up causing it to enter the retry loop.
Using that I raise a
ConnectionAborted
that enters the retry loop properly