Skip to content

Commit

Permalink
Tombstones and supporting code
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Sep 22, 2023
1 parent c4c35bb commit d972155
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 9 deletions.
59 changes: 53 additions & 6 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,19 +229,66 @@ ss::future<bool> seq_writer::write_config(
}

ss::future<std::optional<bool>> seq_writer::do_delete_config(
std::optional<subject> sub, model::offset write_at, seq_writer& seq) {
subject sub, model::offset write_at, seq_writer& seq) {
vlog(plog.debug, "delete config sub={} offset={}", sub, write_at);

try {
if (sub.has_value()) {
(void)co_await seq._store.get_compatibility(
sub.value(), default_to_global::no);
}
(void)co_await seq._store.get_compatibility(sub, default_to_global::no);
} catch (const exception&) {
// subject config already blank
co_return false;
}

std::vector<seq_marker> sequences{
co_await _store.get_subject_config_written_at(sub)};

storage::record_batch_builder rb{
model::record_batch_type::raft_data, model::offset{0}};

std::vector<config_key> keys;
for (auto s : sequences) {
vlog(
plog.debug,
"Deleting config: tombstoning config_key for sub={} at {}",
sub,
s);

switch (s.key_type) {
case seq_marker_key_type::config: {
auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}};
keys.push_back(key);
rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
} break;
default:
vassert(false, "Unexpected key type: {}", s.key_type);
}
}

auto ts_batch = std::move(rb).build();
kafka::partition_produce_response res
= co_await _client.local().produce_record_batch(
model::schema_registry_internal_tp, std::move(ts_batch));

if (res.error_code != kafka::error_code::none) {
vlog(
plog.error,
"Error writing to subject topic: {} {}",
res.error_code,
res.error_message);
throw kafka::exception(res.error_code, *res.error_message);
}

auto applier = consume_to_store(seq._store, seq);
auto offset = res.base_offset;
for (const auto& k : keys) {
co_await applier.apply(offset, k, std::nullopt);
seq.advance_offset_inner(offset);
offset++;
}

// co_return true;

// TODO(oren): seems like this might not be necessary?
auto key = config_key{.seq{write_at}, .node{seq._node_id}, .sub{sub}};
auto batch = as_record_batch(key, std::nullopt);
auto success = co_await seq.produce_and_check(write_at, std::move(batch));
Expand All @@ -256,7 +303,7 @@ ss::future<std::optional<bool>> seq_writer::do_delete_config(
}
}

ss::future<bool> seq_writer::delete_config(std::optional<subject> sub) {
ss::future<bool> seq_writer::delete_config(subject sub) {
return sequenced_write(
[this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return do_delete_config(sub, write_at, seq);
Expand Down
6 changes: 3 additions & 3 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
ss::future<bool>
write_config(std::optional<subject> sub, compatibility_level compat);

ss::future<bool> delete_config(std::optional<subject> sub);
ss::future<bool> delete_config(subject sub);

ss::future<bool>
delete_subject_version(subject sub, schema_version version);
Expand Down Expand Up @@ -77,8 +77,8 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
model::offset write_at,
seq_writer& seq);

ss::future<std::optional<bool>> do_delete_config(
std::optional<subject> sub, model::offset write_at, seq_writer& seq);
ss::future<std::optional<bool>>
do_delete_config(subject sub, model::offset write_at, seq_writer& seq);

ss::future<std::optional<bool>> do_delete_subject_version(
subject sub,
Expand Down
9 changes: 9 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) {
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_config_written_at(subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.store::get_subject_config_written_at(sub).value();
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {
auto sub_shard{shard_for(sub)};
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class sharded_store {
///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>> get_subject_written_at(subject sub);

///\brief Get sequence number history of subject config. Subject need
/// not be soft-deleted first
ss::future<std::vector<seq_marker>>
get_subject_config_written_at(subject sub);

///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>>
get_subject_version_written_at(subject sub, schema_version version);
Expand Down
25 changes: 25 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,31 @@ class store {
}
}

/// \brief Return the seq_marker write history of a subject, but only
/// config_keys
///
/// \return A vector (possibly empty)
result<std::vector<seq_marker>>
get_subject_config_written_at(const subject& sub) const {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));

if (sub_it->second.written_at.empty()) {
return not_found(sub);
}

std::vector<seq_marker> result;
std::copy_if(
std::begin(sub_it->second.written_at),
std::end(sub_it->second.written_at),
std::back_inserter(result),
[](const auto& sm) {
return sm.key_type == seq_marker_key_type::config;
});

return std::move(result);
}

/// \brief Return the seq_marker write history of a version.
///
/// \return A vector with at least one element
Expand Down

0 comments on commit d972155

Please sign in to comment.