Skip to content

Commit

Permalink
Merge pull request #14141 from vbotbuildovich/backport-pr-11181-v23.1…
Browse files Browse the repository at this point in the history
….x-333

[v23.1.x] k/p/gen: Use `fragmented_vector` for  `fetchable_partition_response`
  • Loading branch information
piyushredpanda authored Oct 13, 2023
2 parents 69fca4d + d767a35 commit b52e57f
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/v/kafka/client/fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) {
.aborted{},
.records{}};

std::vector<fetch_response::partition_response> responses;
small_fragment_vector<fetch_response::partition_response> responses;
responses.push_back(std::move(pr));
auto response = fetch_response::partition{.name = tp.topic};
response.partitions = std::move(responses);
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/protocol/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ struct fetch_response final {
public:
using partition_iterator = std::vector<partition>::iterator;
using partition_response_iterator
= std::vector<partition_response>::iterator;
= small_fragment_vector<partition_response>::iterator;

struct value_type {
partition_iterator partition;
Expand Down
13 changes: 8 additions & 5 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,12 @@
}
# yapf: enable

# These types, when they appear as the member type of an array, will use
# a vector implementation which resists fragmentation.
enable_fragmentation_resistance = {'metadata_response_partition'}
# These types, when they appear as the member type of an array, will override
# the container type from std::vector
override_member_container = {
'metadata_response_partition': 'large_fragment_vector',
'fetchable_partition_response': 'small_fragment_vector'
}


def make_context_field(path):
Expand Down Expand Up @@ -1012,8 +1015,8 @@ def type_name_parts(self):
yield name
if isinstance(self._type, ArrayType):
assert default_value is None # not supported
if name in enable_fragmentation_resistance:
yield "large_fragment_vector"
if name in override_member_container:
yield override_member_container[name]
else:
yield "std::vector"
if self.nullable():
Expand Down
5 changes: 1 addition & 4 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,8 @@ op_context::op_context(request_context&& ctx, ss::smp_service_group ssg)

// insert and reserve space for a new topic in the response
void op_context::start_response_topic(const fetch_request::topic& topic) {
auto& p = response.data.topics.emplace_back(
response.data.topics.emplace_back(
fetchable_topic_response{.name = topic.name});
p.partitions.reserve(topic.fetch_partitions.size());
}

void op_context::start_response_partition(const fetch_request::partition& p) {
Expand Down Expand Up @@ -921,8 +920,6 @@ ss::future<response_ptr> op_context::send_response() && {
if (it->is_new_topic) {
final_response.data.topics.emplace_back(
fetchable_topic_response{.name = it->partition->name});
final_response.data.topics.back().partitions.reserve(
it->partition->partitions.size());
}

fetch_response::partition_response r{
Expand Down
6 changes: 6 additions & 0 deletions src/v/utils/fragmented_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,9 @@ class fragmented_vector {
*/
template<typename T>
using large_fragment_vector = fragmented_vector<T, 32 * 1024>;

/**
* An alias for a fragmented_vector using a smaller fragment size.
*/
template<typename T>
using small_fragment_vector = fragmented_vector<T, 1024>;

0 comments on commit b52e57f

Please sign in to comment.