diff --git a/src/v/kafka/client/fetcher.cc b/src/v/kafka/client/fetcher.cc index e50de7a0ccc4..51146da7ca59 100644 --- a/src/v/kafka/client/fetcher.cc +++ b/src/v/kafka/client/fetcher.cc @@ -74,7 +74,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) { .aborted{}, .records{}}; - std::vector responses; + small_fragment_vector responses; responses.push_back(std::move(pr)); auto response = fetch_response::partition{.name = tp.topic}; response.partitions = std::move(responses); diff --git a/src/v/kafka/protocol/fetch.h b/src/v/kafka/protocol/fetch.h index c5ea9d9268fa..ad68e06c7f46 100644 --- a/src/v/kafka/protocol/fetch.h +++ b/src/v/kafka/protocol/fetch.h @@ -213,7 +213,7 @@ struct fetch_response final { public: using partition_iterator = std::vector::iterator; using partition_response_iterator - = std::vector::iterator; + = small_fragment_vector::iterator; struct value_type { partition_iterator partition; diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index 46aea4616b64..dc05f32e8724 100755 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -461,9 +461,12 @@ } # yapf: enable -# These types, when they appear as the member type of an array, will use -# a vector implementation which resists fragmentation. -enable_fragmentation_resistance = {'metadata_response_partition'} +# These types, when they appear as the member type of an array, will override +# the container type from std::vector +override_member_container = { + 'metadata_response_partition': 'large_fragment_vector', + 'fetchable_partition_response': 'small_fragment_vector' +} def make_context_field(path): @@ -1040,8 +1043,8 @@ def type_name_parts(self): yield name if isinstance(self._type, ArrayType): assert default_value is None # not supported - if name in enable_fragmentation_resistance: - yield "large_fragment_vector" + if name in override_member_container: + yield override_member_container[name] else: yield "std::vector" if self.nullable(): diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 8de5153d9e43..7aecf9df4e9f 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -926,9 +926,8 @@ op_context::op_context(request_context&& ctx, ss::smp_service_group ssg) // insert and reserve space for a new topic in the response void op_context::start_response_topic(const fetch_request::topic& topic) { - auto& p = response.data.topics.emplace_back( + response.data.topics.emplace_back( fetchable_topic_response{.name = topic.name}); - p.partitions.reserve(topic.fetch_partitions.size()); } void op_context::start_response_partition(const fetch_request::partition& p) { @@ -1058,8 +1057,6 @@ ss::future op_context::send_response() && { if (it->is_new_topic) { final_response.data.topics.emplace_back( fetchable_topic_response{.name = it->partition->name}); - final_response.data.topics.back().partitions.reserve( - it->partition->partitions.size()); } fetch_response::partition_response r{ diff --git a/src/v/utils/fragmented_vector.h b/src/v/utils/fragmented_vector.h index 441514c32c66..988151a9191b 100644 --- a/src/v/utils/fragmented_vector.h +++ b/src/v/utils/fragmented_vector.h @@ -338,3 +338,9 @@ class fragmented_vector { */ template using large_fragment_vector = fragmented_vector; + +/** + * An alias for a fragmented_vector using a smaller fragment size. + */ +template +using small_fragment_vector = fragmented_vector;