-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka: Abort fetch and list_offsets when client disconnects #12021
Conversation
/cdt tests/rptest/scale_tests/tiered_storage_reader_stress_test.py |
69f7e08
to
4e04139
Compare
/cdt tests/rptest/scale_tests/tiered_storage_reader_stress_test.py |
4e04139
to
f805dcc
Compare
Changes in force push:
|
@@ -94,7 +94,9 @@ static ss::future<read_result> read_from_partition( | |||
kafka_read_priority(), | |||
std::nullopt, | |||
std::nullopt, | |||
std::nullopt); | |||
config.abort_source.has_value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would config not always have one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would config not always have one
A good question. Ultimately when all the places and the tests are wired up, it should not be optional.
|
||
namespace ssx { | ||
|
||
class sharded_abort_source { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surprised this isn't already in seastar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
application and controller both have a manual version of this, that might be worth replacing.
@@ -60,6 +60,11 @@ std::ostream& operator<<(std::ostream& o, const log_reader_config& cfg) { | |||
o << ", over_budget:" << cfg.over_budget; | |||
o << ", strict_max_bytes:" << cfg.strict_max_bytes; | |||
o << ", skip_batch_cache:" << cfg.skip_batch_cache; | |||
o << ", abortable:" << cfg.abort_source.has_value(); | |||
o << ", aborted:" | |||
<< (cfg.abort_source.has_value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right unclear to me why we'd ever have a config w/out an abort source. doesn't make sense to be optional, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are unit/fixture tests that don't wire up the correct lifecycle, that should be fixed of course, but the optionality already existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f805dcc
to
7fc2cab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
/ci-repeat 1 |
7fc2cab
to
988c77c
Compare
Rebased due to conflict in: #12304 |
The failing tests are:
Which must be handled before this is merged. I can't reproduce it locally. The other |
I reopened #8457 and noted what looks to be happening. It doesn't look related to this PR (with acks=0 we end up producing less than required to reupload). |
`wait_for_input_shutdown` allows detection of disconnection of the peer via TCP keepalive. Signed-off-by: Ben Pope <[email protected]>
The `sharded_abort_source` is subscribed to the `net::server::abort_source`, so that it is aborted when the server is aborted, but can also be manually aborted earlier. Some requests such as fetch require results from several partitions, which live on disparate cores; a sharded abort source enables that. Signed-off-by: Ben Pope <[email protected]>
Abort the `abort_source `when a peer disconnects. Use an exception type that satisfies `net::is_disconnect_exception` Signed-off-by: Ben Pope <[email protected]>
Expose the `connection_context::abort_source` through an interface in case a new lifetime needs to exist on the request at a future time. Signed-off-by: Ben Pope <[email protected]>
Signed-off-by: Ben Pope <[email protected]>
Signed-off-by: Ben Pope <[email protected]>
Signed-off-by: Ben Pope <[email protected]>
Signed-off-by: Ben Pope <[email protected]>
`remote_partition` can in theory receive an abort at any time. The most common case would be a client disconnect. Previously, this was handled by returning the reader to the `materialized_segment_state`. This is dangerous since the abort may catch us at a bad time and return a reader in an undefined state. We've seen this cause issues when the reader gets reused.
An instance of partition_record_batch_reader_impl can be aborted by two abort sources: 1. The abort source threaded in from the Kafka layer via `log_reader_config` 2. The abort source of `remote_partition` itself. In both cases, the reader should exit. This patch achieves this by monitoring both abort sources in the read loop and inserting strategical checks between scheduling points. This approach is not ideal, but `partition_record_batch_reader_impl` and `remote_segment_batch_reader` are very closely coupled which makes it difficult to approach things differently.
988c77c
to
bb46b94
Compare
Rebased due to minor conflict in includes in |
@@ -342,6 +355,8 @@ class partition_record_batch_reader_impl final | |||
try { | |||
auto result = co_await _reader->read_some( | |||
deadline, *_ot_state); | |||
throw_on_external_abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't we really want to pass the abort source further down to read_some
? Otherwise if we get stuck on any of the reads we will never reach this?
Not saying this isn't an improvement but more interested in a general discussion as I am currently looking into a different case where we "leak" background futures that are stuck on reads/writes under load. It feels like we need more abort_sources in general and also need to make sure we pass them all the way down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me, I had assumed the remote_segment_batch_consumer
took note of its abort_source
, but it does not appear to.
Maybe something like this:
diff --git a/src/v/cloud_storage/remote_segment.cc b/src/v/cloud_storage/remote_segment.cc
index 87b3d9939b..c9e3fec90b 100644
--- a/src/v/cloud_storage/remote_segment.cc
+++ b/src/v/cloud_storage/remote_segment.cc
@@ -1270,6 +1270,12 @@ public:
co_return stop_parser::yes;
}
+ if (
+ _config.abort_source.has_value()
+ && _config.abort_source.value().get().abort_requested()) {
+ co_return stop_parser::yes;
+ }
+
co_return stop_parser::no;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR adds a connection-level abort_source, it's possible that a request-level abort_source would also be useful, which could chain onto the connection-level one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time the abort_source will be triggered when the underlying remote_segment_batch_readfer
will be waiting inside the remote_segment::hydrate
call and it won't be able to quit early. The latency there is in the ballpark of 1s. So I agree that it should be handled one level below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's actually desirable to finish the hydration even if the request got aborted. In all likelihood, the client will retry, so if we cancel the hydration we'd just be throwing away work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree. The hydration should continue. But inside the read_some
call there is a call to hydrate
which will block the fiber from being aborted. I'm not sure moving the reader to the evicted list will solve this. The reader will move but the fiber that actually reads data for the fetch request will be stuck until the segment is downloaded. The hydrate
method itself is actually just waiting on a future. The hydration is done in the background so I think it should be possible to abort it without canceling hydration. The background hydration loop should be prepared to this and handle broken_promise
exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe for this PR it's fine. But we should consider propagating abort source further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe since you're destroying the reader it will get unstuck.
@@ -349,6 +380,7 @@ class connection_context final | |||
sequence_id _next_response; | |||
sequence_id _seq_idx; | |||
map_t _responses; | |||
ssx::sharded_abort_source _as; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does having a ss::sharded
object per connection context make the connection context/connection creation quite expensive as it forces lots of cross cpu talk?
Not sure it matters but just wondering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To some extent, yes.
Cross-core calls occur:
start()
stop()
request_abort()
The trade-off is that every core has a local abort_source
to query for:
abort_requested()
check()
subscribe()
It might be possible to create the per-core abort_source
lazily:
- Futurize the call to
local()
- If the local one is constructed, return it
- Else cross-core call to request to construct it on the owning core
- Maybe lock a mutex
- Cross-core call (back to local) to actually construct it
It's a fair bit of complexity, and makes the interface more awkward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. I'm not sure about shared abort source since it will create an instance on every shard + it will have to interact with all those fibers per fetch request.
Aside from starting and stopping the connection, it only interacts cross-core when aborting. If a fetch fetches from multiple partitions, on different cores, they have a local instance to query. On the data path, there's almost nothing to do, other than check a local bool to see if the |
/backport v23.2.x |
/backport v23.1.x |
Failed to run cherry-pick command. I executed the commands below:
|
Failed to run cherry-pick command. I executed the commands below:
|
std::exception_ptr eptr; | ||
try { | ||
co_await ctx->start(); | ||
co_await ctx->process(); | ||
} catch (...) { | ||
auto eptr = std::current_exception(); | ||
eptr = std::current_exception(); | ||
} | ||
if (!eptr) { | ||
co_return co_await ctx->stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rockwotj did you tell me about an upstream seastar utility for dealing with this pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ss::coroutine::as_future
auto fut = co_await ss::coroutine::as_future(ctx->start())
if (!fut.failed()) {
fut = co_await ss::coro::as_future(ctx->process());
}
if (fut.failed()) {
co_return co_await ctx->stop();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh right, thanks for the reminder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice(r).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good stuff
kafka: Abort fetch and list_offsets (timequery) when the client disconnects for timely resource cleanup.
abort_source
toconnection_context
abort_source
when the socket is disconnectedabort_source
to therequest_context
and into:fetch
list_offsets
(timequery)The primary goal is to improve the situation for #11799
Testing can be done with something like:
And the kill the client either gracefully or will
kill -9
.Backports Required
Release Notes
Improvements
fetch
andlist_offsets
(timequery) when the client disconnects for timely resource cleanup.