Skip to content

Commit

Permalink
Merge pull request #8469 from travisdowns/td20-frag-vector-metadata
Browse files Browse the repository at this point in the history
Use fragmented vector for metadata response
  • Loading branch information
dotnwat authored Feb 1, 2023
2 parents b0085fa + 1eb014b commit daf23c3
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 44 deletions.
4 changes: 2 additions & 2 deletions src/v/kafka/client/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ consumer::get_subscribed_topic_metadata() {
std::vector<metadata_response::topic> topics;
topics.reserve(_subscribed_topics.size());
std::set_intersection(
res.data.topics.begin(),
res.data.topics.end(),
std::make_move_iterator(res.data.topics.begin()),
std::make_move_iterator(res.data.topics.end()),
_subscribed_topics.begin(),
_subscribed_topics.end(),
std::back_inserter(topics),
Expand Down
40 changes: 31 additions & 9 deletions src/v/kafka/protocol/response_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,28 @@

#include <boost/range/numeric.hpp>

#include <concepts>
#include <optional>
#include <string_view>

namespace kafka {

/**
* Concept for the containers that write_array and friends accept.
*
* Basically, must have a size() and be iterable.
*/
template<typename C>
concept SizedContainer = requires(C c, const C cc) {
typename C::value_type;
requires std::forward_iterator<typename C::iterator>;
{ c.size() } -> std::same_as<size_t>;
{ c.begin() } -> std::same_as<typename C::iterator>;
{ c.end() } -> std::same_as<typename C::iterator>;
{ cc.begin() } -> std::same_as<typename C::const_iterator>;
{ cc.end() } -> std::same_as<typename C::const_iterator>;
};

class response_writer;
void writer_serialize_batch(response_writer& w, model::record_batch&& batch);

Expand Down Expand Up @@ -228,24 +245,27 @@ class response_writer {
return write(int32_t(d.count()));
}

template<typename T, typename ElementWriter>
template<typename C, typename ElementWriter>
requires requires(
ElementWriter writer, response_writer& rw, const T& elem) {
ElementWriter writer,
response_writer& rw,
const typename C::value_type& elem) {
{ writer(elem, rw) } -> std::same_as<void>;
}
uint32_t write_array(const std::vector<T>& v, ElementWriter&& writer) {
uint32_t write_array(const C& v, ElementWriter&& writer) {
auto start_size = uint32_t(_out->size_bytes());
write(int32_t(v.size()));
for (auto& elem : v) {
writer(elem, *this);
}
return _out->size_bytes() - start_size;
}
template<typename T, typename ElementWriter>
requires requires(ElementWriter writer, response_writer& rw, T& elem) {
template<typename C, typename ElementWriter>
requires requires(
ElementWriter writer, response_writer& rw, typename C::value_type& elem) {
{ writer(elem, rw) } -> std::same_as<void>;
}
uint32_t write_array(std::vector<T>& v, ElementWriter&& writer) {
uint32_t write_array(C& v, ElementWriter&& writer) {
auto start_size = uint32_t(_out->size_bytes());
write(int32_t(v.size()));
for (auto& elem : v) {
Expand All @@ -266,11 +286,13 @@ class response_writer {
return write_array(*v, std::forward<ElementWriter>(writer));
}

template<typename T, typename ElementWriter>
requires requires(ElementWriter writer, response_writer& rw, T& elem) {
template<typename C, typename ElementWriter>
requires requires(
ElementWriter writer, response_writer& rw, typename C::value_type& elem) {
requires SizedContainer<C>;
{ writer(elem, rw) } -> std::same_as<void>;
}
uint32_t write_flex_array(std::vector<T>& v, ElementWriter&& writer) {
uint32_t write_flex_array(C& v, ElementWriter&& writer) {
auto start_size = uint32_t(_out->size_bytes());
write_unsigned_varint(v.size() + 1);
for (auto& elem : v) {
Expand Down
19 changes: 14 additions & 5 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@
}
# yapf: enable

# These types, when they appear as the member type of an array, will use
# a vector implementation which resists fragmentation.
enable_fragmentation_resistance = {'metadata_response_partition'}


def make_context_field(path):
"""
Expand Down Expand Up @@ -953,8 +957,8 @@ def is_sensitive(self):
# sensitive field, but it itself isn't sensitive.
return False
assert d is None or d is True, \
"expected field '{}' to be missing or True; field path: {}, remaining path: {}" \
.format(self._field["name"], self._path, d)
"expected field '{}' to be missing or True; field path: {}, remaining path: {}" \
.format(self._field["name"], self._path, d)
return d

@property
Expand All @@ -974,7 +978,10 @@ def type_name(self):
name, default_value = self._redpanda_type()
if isinstance(self._type, ArrayType):
assert default_value is None # not supported
name = f"std::vector<{name}>"
if name in enable_fragmentation_resistance:
name = f'large_fragment_vector<{name}>'
else:
name = f'std::vector<{name}>'
if self.nullable():
assert default_value is None # not supported
return f"std::optional<{name}>", None
Expand Down Expand Up @@ -1015,6 +1022,7 @@ def is_default_comparable(self):
#include "model/metadata.h"
#include "kafka/protocol/errors.h"
#include "seastarx.h"
#include "utils/fragmented_vector.h"
{%- for header in struct.headers("header") %}
{%- if header.startswith("<") %}
Expand Down Expand Up @@ -1544,7 +1552,8 @@ class response;
ALLOWED_SCALAR_TYPES = list(set(SCALAR_TYPES) - set(["iobuf"]))
ALLOWED_TYPES = \
ALLOWED_SCALAR_TYPES + \
[f"[]{t}" for t in ALLOWED_SCALAR_TYPES + STRUCT_TYPES] + TAGGED_WITH_FIELDS
[f"[]{t}" for t in ALLOWED_SCALAR_TYPES +
STRUCT_TYPES] + TAGGED_WITH_FIELDS

# yapf: disable
SCHEMA = {
Expand Down Expand Up @@ -1626,7 +1635,7 @@ class response;
],
},
"fields": {"$ref": "#/definitions/fields"},
"listeners": { "type": "array", "optional": True }
"listeners": {"type": "array", "optional": True}
},
"required": [
"apiKey",
Expand Down
17 changes: 11 additions & 6 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <boost/numeric/conversion/cast.hpp>
#include <fmt/ostream.h>

#include <iterator>
#include <type_traits>

namespace kafka {
Expand Down Expand Up @@ -107,7 +108,6 @@ metadata_response::topic make_topic_response_from_topic_metadata(
const cluster::topic_metadata& tp_md,
const is_node_isolated_or_decommissioned is_node_isolated) {
metadata_response::topic tp;
tp.partitions.reserve(tp_md.get_assignments().size());
tp.error_code = error_code::none;
model::topic_namespace_view tp_ns = tp_md.get_configuration().tp_ns;
tp.name = tp_md.get_configuration().tp_ns.tp;
Expand Down Expand Up @@ -167,7 +167,7 @@ static ss::future<metadata_response::topic> create_topic(
metadata_response::topic t;
t.name = std::move(topic);
t.error_code = error_code::broker_not_available;
return ss::make_ready_future<metadata_response::topic>(t);
return ss::make_ready_future<metadata_response::topic>(std::move(t));
}
// default topic configuration
cluster::topic_configuration cfg{
Expand All @@ -188,15 +188,17 @@ static ss::future<metadata_response::topic> create_topic(
metadata_response::topic t;
t.name = std::move(res[0].tp_ns.tp);
t.error_code = map_topic_error_code(res[0].ec);
return ss::make_ready_future<metadata_response::topic>(t);
return ss::make_ready_future<metadata_response::topic>(
std::move(t));
}
auto tp_md = md_cache.get_topic_metadata(res[0].tp_ns);

if (!tp_md) {
metadata_response::topic t;
t.name = std::move(res[0].tp_ns.tp);
t.error_code = error_code::invalid_topic_exception;
return ss::make_ready_future<metadata_response::topic>(t);
return ss::make_ready_future<metadata_response::topic>(
std::move(t));
}

return wait_for_topics(
Expand Down Expand Up @@ -324,8 +326,11 @@ static ss::future<std::vector<metadata_response::topic>> get_topic_metadata(
return ss::when_all_succeed(new_topics.begin(), new_topics.end())
.then([res = std::move(res)](
std::vector<metadata_response::topic> topics) mutable {
res.insert(res.end(), topics.begin(), topics.end());
return res;
res.insert(
res.end(),
std::make_move_iterator(topics.begin()),
std::make_move_iterator(topics.end()));
return std::move(res);
});
}

Expand Down
Loading

0 comments on commit daf23c3

Please sign in to comment.