Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] k/p/gen: Use fragmented_vector for fetchable_partition_response #14106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/kafka/client/fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) {
.aborted{},
.records{}};

std::vector<fetch_response::partition_response> responses;
small_fragment_vector<fetch_response::partition_response> responses;
responses.push_back(std::move(pr));
auto response = fetch_response::partition{.name = tp.topic};
response.partitions = std::move(responses);
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/protocol/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ struct fetch_response final {
public:
using partition_iterator = std::vector<partition>::iterator;
using partition_response_iterator
= std::vector<partition_response>::iterator;
= small_fragment_vector<partition_response>::iterator;

struct value_type {
partition_iterator partition;
Expand Down
13 changes: 8 additions & 5 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,12 @@
}
# yapf: enable

# These types, when they appear as the member type of an array, will use
# a vector implementation which resists fragmentation.
enable_fragmentation_resistance = {'metadata_response_partition'}
# These types, when they appear as the member type of an array, will override
# the container type from std::vector
override_member_container = {
'metadata_response_partition': 'large_fragment_vector',
'fetchable_partition_response': 'small_fragment_vector'
}


def make_context_field(path):
Expand Down Expand Up @@ -1040,8 +1043,8 @@ def type_name_parts(self):
yield name
if isinstance(self._type, ArrayType):
assert default_value is None # not supported
if name in enable_fragmentation_resistance:
yield "large_fragment_vector"
if name in override_member_container:
yield override_member_container[name]
else:
yield "std::vector"
if self.nullable():
Expand Down
5 changes: 1 addition & 4 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -926,9 +926,8 @@ op_context::op_context(request_context&& ctx, ss::smp_service_group ssg)

// insert and reserve space for a new topic in the response
void op_context::start_response_topic(const fetch_request::topic& topic) {
auto& p = response.data.topics.emplace_back(
response.data.topics.emplace_back(
fetchable_topic_response{.name = topic.name});
p.partitions.reserve(topic.fetch_partitions.size());
}

void op_context::start_response_partition(const fetch_request::partition& p) {
Expand Down Expand Up @@ -1058,8 +1057,6 @@ ss::future<response_ptr> op_context::send_response() && {
if (it->is_new_topic) {
final_response.data.topics.emplace_back(
fetchable_topic_response{.name = it->partition->name});
final_response.data.topics.back().partitions.reserve(
it->partition->partitions.size());
}

fetch_response::partition_response r{
Expand Down
6 changes: 6 additions & 0 deletions src/v/utils/fragmented_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,9 @@ class fragmented_vector {
*/
template<typename T>
using large_fragment_vector = fragmented_vector<T, 32 * 1024>;

/**
* An alias for a fragmented_vector using a smaller fragment size.
*/
template<typename T>
using small_fragment_vector = fragmented_vector<T, 1024>;
Loading