Skip to content

Commit

Permalink
cloud_storage: kafka::offset lookup in partition_manifest
Browse files Browse the repository at this point in the history
When serving a fetch request by a kafka::offset, we previously used the
`partition_manifest` to perform a segment lookup by
`kafka::offset_cast(ko)` first in order to traverse the segments forward
to find the actual segment that contained `ko`.

This doesn't work when the manifest has been truncated, e.g. if the
casted offset falls before the start of the manifest, we would
previously return that no segment exists for the fetch. This could
result in segments erronesouly not being returned, and fetches
erroneously being met with no data when some existed.

This commit fixes the behavior to no longer use the casted segment
lookup and adds some test coverage for kafka::offset lookups.

Fixes redpanda-data#7708
  • Loading branch information
andrwng committed Jan 9, 2023
1 parent 0161370 commit fe09a08
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 28 deletions.
57 changes: 29 additions & 28 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,42 +285,44 @@ partition_manifest::get_start_kafka_offset() const {
partition_manifest::const_iterator
partition_manifest::segment_containing(kafka::offset o) const {
vlog(cst_log.debug, "Metadata lookup using kafka offset {}", o);
if (_segments.empty()) {
return end();
}
// Kafka offset is always <= log offset.
// To find a segment by its kafka offset we can simply query
// manifest by log offset and then traverse forward until we
// will find matching segment.
auto it = segment_containing(kafka::offset_cast(o));
if (it == end()) {
return it;
}
auto prev = end();
// find a matching segment.
auto it = _segments.lower_bound(kafka::offset_cast(o));
// We need to find first element which has greater kafka offset than
// the target and step back. It is possible to have a segment that
// doesn't have data batches. This scan has to skip segments like that.
while (it != end()) {
auto base = it->second.base_offset - it->second.delta_offset;
if (base > o) {
// We need to find first element which has greater kafka
// offset then the target and step back. It is possible
// to have a segment that doesn't have data batches. This
// scan has to skip segments like that.
break;
if (it->second.base_kafka_offset() > o) {
// The beginning of the manifest already has a base offset that
// doesn't satisfy the query.
if (it == begin()) {
return end();
}
// On the first segment we see with a base kafka offset higher than
// 'o', return its previous segment.
return std::prev(it);
}
prev = it;
it = std::next(it);
}
if (it == end()) {
if (prev->second.delta_offset_end != model::offset_delta{}) {
// In case if 'prev' points to the last segment it's not guaranteed
// that the segment contains the required kafka offset. We need an
// extra check using delta_offset_end. If the field is not set then
// we will return the last segment. This is OK since
// delta_offset_end will always be set for new segments.
auto m = prev->second.committed_offset
- prev->second.delta_offset_end;
if (m < o) {
prev = end();
}
// All segments had base kafka offsets lower than 'o'.
auto back = std::prev(it);
if (back->second.delta_offset_end != model::offset_delta{}) {
// If 'prev' points to the last segment, it's not guaranteed that
// the segment contains the required kafka offset. We need an extra
// check using delta_offset_end. If the field is not set then we
// will return the last segment. This is OK since delta_offset_end
// will always be set for new segments.
if (back->second.committed_kafka_offset() < o) {
return end();
}
}
return prev;
return back;
;
}

model::offset partition_manifest::get_last_uploaded_compacted_offset() const {
Expand Down Expand Up @@ -1312,7 +1314,6 @@ partition_manifest::segment_containing(model::offset o) const {
return end();
}

// Make sure to only compare based on the offset and not the term.
auto it = _segments.upper_bound(o);
if (it == _segments.begin()) {
return end();
Expand Down
135 changes: 135 additions & 0 deletions src/v/cloud_storage/tests/partition_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,141 @@ inline ss::input_stream<char> make_manifest_stream(std::string_view json) {
return make_iobuf_input_stream(std::move(i));
}

namespace {

ss::logger test_log("partition_manifest_test");

// Returns a manifest with segments with the corresponding offset bounds, with
// the last offsets in the list referring to the exclusive end of the returned
// manifest.
partition_manifest
manifest_for(std::vector<std::pair<model::offset, kafka::offset>> o) {
partition_manifest manifest;
manifest.update(make_manifest_stream(empty_manifest_json)).get0();
for (int i = 0; i < o.size() - 1; i++) {
segment_meta seg{
.is_compacted = false,
.size_bytes = 1024,
.base_offset = o[i].first,
.committed_offset = model::offset(o[i + 1].first() - 1),
.delta_offset = model::offset_delta(o[i].first() - o[i].second()),
.delta_offset_end = model::offset_delta(
o[i + 1].first() - o[i + 1].second()),
};
manifest.add(
segment_name(fmt::format("{}-1-v1.log", o[i].first())), seg);
}
return manifest;
}

} // anonymous namespace

SEASTAR_THREAD_TEST_CASE(test_segment_contains_by_kafka_offset) {
// Returns true if a kafka::offset lookup returns the segment with the
// expected base offsets.
const auto check_offset = [](
const partition_manifest& m,
kafka::offset ko,
model::offset expected_base_mo,
kafka::offset expected_base_ko) {
const auto it = m.segment_containing(ko);
bool success = it != m.end()
// Check that the segment base offsets match the segment
// we were expecting...
&& it->second.base_offset == expected_base_mo
&& it->second.base_kafka_offset() == expected_base_ko
// ...and as a sanity check, make sure the kafka::offset
// falls in the segment.
&& it->second.base_kafka_offset() <= ko
&& it->second.committed_kafka_offset() >= ko;
if (success) {
return true;
}
if (it == m.end()) {
test_log.error("no segment for {}", ko);
return false;
}
test_log.error(
"segment {} doesn't match the expected base model::offset {} or "
"kafka::offset {}, or doesn't include "
"{} in its range",
it->second,
expected_base_mo,
expected_base_ko,
ko);
return false;
};

// Returns true if a kafka::offset lookup returns that no such segment is
// returned.
const auto check_no_offset =
[](const partition_manifest& m, kafka::offset ko) {
const auto it = m.segment_containing(ko);
bool success = it == m.end();
if (success) {
return true;
}
test_log.error(
"unexpected segment for kafka::offset {}: {}", ko, it->second);
return false;
};

// mo: 0 10 20 30
// [a ][b ][c ]end
// ko: 0 5 10 15
partition_manifest full_manifest = manifest_for({
{model::offset(0), kafka::offset(0)},
{model::offset(10), kafka::offset(5)},
{model::offset(20), kafka::offset(10)},
{model::offset(30), kafka::offset(15)},
});
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(0), model::offset(0), kafka::offset(0)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(4), model::offset(0), kafka::offset(0)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(5), model::offset(10), kafka::offset(5)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(9), model::offset(10), kafka::offset(5)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(10), model::offset(20), kafka::offset(10)));
BOOST_REQUIRE(check_offset(
full_manifest, kafka::offset(14), model::offset(20), kafka::offset(10)));
BOOST_REQUIRE(check_no_offset(full_manifest, kafka::offset(15)));

// mo: 10 20 30
// [b ][c ]end
// ko: 5 10 15
partition_manifest truncated_manifest = manifest_for({
{model::offset(10), kafka::offset(5)},
{model::offset(20), kafka::offset(10)},
{model::offset(30), kafka::offset(15)},
});
BOOST_REQUIRE(check_no_offset(truncated_manifest, kafka::offset(0)));
BOOST_REQUIRE(check_no_offset(truncated_manifest, kafka::offset(4)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(5),
model::offset(10),
kafka::offset(5)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(9),
model::offset(10),
kafka::offset(5)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(10),
model::offset(20),
kafka::offset(10)));
BOOST_REQUIRE(check_offset(
truncated_manifest,
kafka::offset(14),
model::offset(20),
kafka::offset(10)));
BOOST_REQUIRE(check_no_offset(full_manifest, kafka::offset(15)));
}

SEASTAR_THREAD_TEST_CASE(test_segment_contains) {
partition_manifest m;
m.update(make_manifest_stream(manifest_with_gaps)).get0();
Expand Down

0 comments on commit fe09a08

Please sign in to comment.