Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] cloud_storage: fix kafka::offset lookup in partition_manifest #8126

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,42 +285,43 @@ partition_manifest::get_start_kafka_offset() const {
partition_manifest::const_iterator
partition_manifest::segment_containing(kafka::offset o) const {
vlog(cst_log.debug, "Metadata lookup using kafka offset {}", o);
if (_segments.empty()) {
return end();
}
// Kafka offset is always <= log offset.
// To find a segment by its kafka offset we can simply query
// manifest by log offset and then traverse forward until we
// will find matching segment.
auto it = segment_containing(kafka::offset_cast(o));
if (it == end()) {
return it;
}
auto prev = end();
// find a matching segment.
auto it = _segments.lower_bound(kafka::offset_cast(o));
// We need to find first element which has greater kafka offset than
// the target and step back. It is possible to have a segment that
// doesn't have data batches. This scan has to skip segments like that.
while (it != end()) {
auto base = it->second.base_offset - it->second.delta_offset;
if (base > o) {
// We need to find first element which has greater kafka
// offset then the target and step back. It is possible
// to have a segment that doesn't have data batches. This
// scan has to skip segments like that.
break;
if (it->second.base_kafka_offset() > o) {
// The beginning of the manifest already has a base offset that
// doesn't satisfy the query.
if (it == begin()) {
return end();
}
// On the first segment we see with a base kafka offset higher than
// 'o', return its previous segment.
return std::prev(it);
}
prev = it;
it = std::next(it);
}
if (it == end()) {
if (prev->second.delta_offset_end != model::offset_delta{}) {
// In case if 'prev' points to the last segment it's not guaranteed
// that the segment contains the required kafka offset. We need an
// extra check using delta_offset_end. If the field is not set then
// we will return the last segment. This is OK since
// delta_offset_end will always be set for new segments.
auto m = prev->second.committed_offset
- prev->second.delta_offset_end;
if (m < o) {
prev = end();
}
// All segments had base kafka offsets lower than 'o'.
auto back = std::prev(it);
if (back->second.delta_offset_end != model::offset_delta{}) {
// If 'prev' points to the last segment, it's not guaranteed that
// the segment contains the required kafka offset. We need an extra
// check using delta_offset_end. If the field is not set then we
// will return the last segment. This is OK since delta_offset_end
// will always be set for new segments.
if (back->second.committed_kafka_offset() < o) {
return end();
}
}
return prev;
return back;
}

model::offset partition_manifest::get_last_uploaded_compacted_offset() const {
Expand Down Expand Up @@ -1312,7 +1313,6 @@ partition_manifest::segment_containing(model::offset o) const {
return end();
}

// Make sure to only compare based on the offset and not the term.
auto it = _segments.upper_bound(o);
if (it == _segments.begin()) {
return end();
Expand Down
135 changes: 135 additions & 0 deletions src/v/cloud_storage/tests/partition_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,141 @@ inline ss::input_stream<char> make_manifest_stream(std::string_view json) {
return make_iobuf_input_stream(std::move(i));
}

namespace {

ss::logger test_log("partition_manifest_test");

// Returns a manifest with segments with the corresponding offset bounds, with
// the last offsets in the list referring to the exclusive end of the returned
// manifest.
partition_manifest
manifest_for(std::vector<std::pair<model::offset, kafka::offset>> o) {
partition_manifest manifest;
manifest.update(make_manifest_stream(empty_manifest_json)).get0();
for (int i = 0; i < o.size() - 1; i++) {
segment_meta seg{
.is_compacted = false,
.size_bytes = 1024,
.base_offset = o[i].first,
.committed_offset = model::offset(o[i + 1].first() - 1),
.delta_offset = model::offset_delta(o[i].first() - o[i].second()),
.delta_offset_end = model::offset_delta(
o[i + 1].first() - o[i + 1].second()),
};
manifest.add(
segment_name(fmt::format("{}-1-v1.log", o[i].first())), seg);
}
return manifest;
}

} // anonymous namespace

SEASTAR_THREAD_TEST_CASE(test_segment_contains_by_kafka_offset) {
// Returns true if a kafka::offset lookup returns the segment with the
// expected base offsets.
const auto check_offset = [](
const partition_manifest& m,
kafka::offset ko,
model::offset expected_base_mo,
kafka::offset expected_base_ko) {
const auto it = m.segment_containing(ko);
bool success = it != m.end()
// Check that the segment base offsets match the segment
// we were expecting...
&& it->second.base_offset == expected_base_mo
&& it->second.base_kafka_offset() == expected_base_ko
// ...and as a sanity check, make sure the kafka::offset
// falls in the segment.
&& it->second.base_kafka_offset() <= ko
&& it->second.committed_kafka_offset() >= ko;
if (success) {
return true;
}
if (it == m.end()) {
test_log.error("no segment for {}", ko);
return false;
}
test_log.error(
"segment {} doesn't match the expected base model::offset {} or "
"kafka::offset {}, or doesn't include "
"{} in its range",
it->second,
expected_base_mo,
expected_base_ko,
ko);
return false;
};

// Returns true if a kafka::offset lookup returns that no such segment is
// returned.
const auto check_no_offset =
[](const partition_manifest& m, kafka::offset ko) {
const auto it = m.segment_containing(ko);
bool success = it == m.end();
if (success) {
return true;
}
test_log.error(
"unexpected segment for kafka::offset {}: {}", ko, it->second);
return false;
};

// mo: 0 10 20 30
// [a ][b ][c ]end
// ko: 0 5 10 15
partition_manifest full_manifest = manifest_for({
{model::offset(0), kafka::offset(0)},
{model::offset(10), kafka::offset(5)},
{model::offset(20), kafka::offset(10)},
{model::offset(30), kafka::offset(15)},
});
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(0), model::offset(0), kafka::offset(0)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(4), model::offset(0), kafka::offset(0)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(5), model::offset(10), kafka::offset(5)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(9), model::offset(10), kafka::offset(5)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(10), model::offset(20), kafka::offset(10)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(14), model::offset(20), kafka::offset(10)));
BOOST_REQUIRE(check_no_offset(full_manifest, kafka::offset(15)));

// mo: 10 20 30
// [b ][c ]end
// ko: 5 10 15
partition_manifest truncated_manifest = manifest_for({
{model::offset(10), kafka::offset(5)},
{model::offset(20), kafka::offset(10)},
{model::offset(30), kafka::offset(15)},
});
BOOST_REQUIRE(check_no_offset(truncated_manifest, kafka::offset(0)));
BOOST_REQUIRE(check_no_offset(truncated_manifest, kafka::offset(4)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(5),
model::offset(10),
kafka::offset(5)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(9),
model::offset(10),
kafka::offset(5)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(10),
model::offset(20),
kafka::offset(10)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(14),
model::offset(20),
kafka::offset(10)));
BOOST_REQUIRE(check_no_offset(full_manifest, kafka::offset(15)));
}

SEASTAR_THREAD_TEST_CASE(test_segment_contains) {
partition_manifest m;
m.update(make_manifest_stream(manifest_with_gaps)).get0();
Expand Down
22 changes: 22 additions & 0 deletions src/v/cloud_storage/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,28 @@

namespace cloud_storage {

std::ostream& operator<<(std::ostream& o, const segment_meta& s) {
fmt::print(
o,
"{{is_compacted: {}, size_bytes: {}, base_offset: {}, committed_offset: "
"{}, base_timestamp: {}, max_timestamp: {}, delta_offset: {}, "
"ntp_revision: {}, archiver_term: {}, segment_term: {}, "
"delta_offset_end: {}, sname_format: {}}}",
s.is_compacted,
s.size_bytes,
s.base_offset,
s.committed_offset,
s.base_timestamp,
s.max_timestamp,
s.delta_offset,
s.ntp_revision,
s.archiver_term,
s.segment_term,
s.delta_offset_end,
s.sname_format);
return o;
}

std::ostream& operator<<(std::ostream& o, const segment_name_format& r) {
switch (r) {
case segment_name_format::v1:
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,6 @@ struct segment_meta {

auto operator<=>(const segment_meta&) const = default;
};
std::ostream& operator<<(std::ostream& o, const segment_meta& r);

} // namespace cloud_storage