From f33973cd55f9ff1af86dd1e5f484ebfd958ff9b1 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Tue, 10 Oct 2023 21:18:35 +0100 Subject: [PATCH] k/protocol/generator: Override fetchable_partition_response * Introduce `small_fragment_vector` * Switch `fetchable_partition_response` to it Signed-off-by: Ben Pope --- src/v/kafka/client/fetcher.cc | 2 +- src/v/kafka/protocol/fetch.h | 2 +- src/v/kafka/protocol/schemata/generator.py | 3 ++- src/v/kafka/server/handlers/fetch.cc | 5 +---- src/v/utils/fragmented_vector.h | 6 ++++++ 5 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/client/fetcher.cc b/src/v/kafka/client/fetcher.cc index e50de7a0ccc4..51146da7ca59 100644 --- a/src/v/kafka/client/fetcher.cc +++ b/src/v/kafka/client/fetcher.cc @@ -74,7 +74,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) { .aborted{}, .records{}}; - std::vector responses; + small_fragment_vector responses; responses.push_back(std::move(pr)); auto response = fetch_response::partition{.name = tp.topic}; response.partitions = std::move(responses); diff --git a/src/v/kafka/protocol/fetch.h b/src/v/kafka/protocol/fetch.h index c5ea9d9268fa..ad68e06c7f46 100644 --- a/src/v/kafka/protocol/fetch.h +++ b/src/v/kafka/protocol/fetch.h @@ -213,7 +213,7 @@ struct fetch_response final { public: using partition_iterator = std::vector::iterator; using partition_response_iterator - = std::vector::iterator; + = small_fragment_vector::iterator; struct value_type { partition_iterator partition; diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index 925df1910ad2..dc05f32e8724 100755 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -464,7 +464,8 @@ # These types, when they appear as the member type of an array, will override # the container type from std::vector override_member_container = { - 'metadata_response_partition': 'large_fragment_vector' + 'metadata_response_partition': 'large_fragment_vector', + 'fetchable_partition_response': 'small_fragment_vector' } diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 8de5153d9e43..7aecf9df4e9f 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -926,9 +926,8 @@ op_context::op_context(request_context&& ctx, ss::smp_service_group ssg) // insert and reserve space for a new topic in the response void op_context::start_response_topic(const fetch_request::topic& topic) { - auto& p = response.data.topics.emplace_back( + response.data.topics.emplace_back( fetchable_topic_response{.name = topic.name}); - p.partitions.reserve(topic.fetch_partitions.size()); } void op_context::start_response_partition(const fetch_request::partition& p) { @@ -1058,8 +1057,6 @@ ss::future op_context::send_response() && { if (it->is_new_topic) { final_response.data.topics.emplace_back( fetchable_topic_response{.name = it->partition->name}); - final_response.data.topics.back().partitions.reserve( - it->partition->partitions.size()); } fetch_response::partition_response r{ diff --git a/src/v/utils/fragmented_vector.h b/src/v/utils/fragmented_vector.h index 441514c32c66..988151a9191b 100644 --- a/src/v/utils/fragmented_vector.h +++ b/src/v/utils/fragmented_vector.h @@ -338,3 +338,9 @@ class fragmented_vector { */ template using large_fragment_vector = fragmented_vector; + +/** + * An alias for a fragmented_vector using a smaller fragment size. + */ +template +using small_fragment_vector = fragmented_vector;