Skip to content

Commit

Permalink
Proposed (naive) change for cleaning up sequence markers
Browse files Browse the repository at this point in the history
  • Loading branch information
oleiman committed Oct 18, 2023
1 parent 3010aaf commit 82f4010
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 9 deletions.
7 changes: 4 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,12 @@ ss::future<bool> sharded_store::set_compatibility(
});
}

ss::future<bool> sharded_store::clear_compatibility(subject sub) {
ss::future<bool>
sharded_store::clear_compatibility(seq_marker marker, subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.clear_compatibility(sub).value();
sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) {
return s.clear_compatibility(marker, sub).value();
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class sharded_store {
seq_marker marker, subject sub, compatibility_level compatibility);

///\brief Clear the compatibility level for a subject.
ss::future<bool> clear_compatibility(subject sub);
ss::future<bool> clear_compatibility(seq_marker marker, subject sub);

///\brief Check if the provided schema is compatible with the subject and
/// version, according the the current compatibility.
Expand Down
8 changes: 7 additions & 1 deletion src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,13 @@ struct consume_to_store {
try {
vlog(plog.debug, "Applying: {}", key);
if (!val) {
co_await _store.clear_compatibility(*key.sub);
co_await _store.clear_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub);
} else if (key.sub) {
co_await _store.set_compatibility(
seq_marker{
Expand Down
12 changes: 11 additions & 1 deletion src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,19 @@ class store {
}

///\brief Clear the compatibility level for a subject.
result<bool> clear_compatibility(const subject& sub) {
result<bool> clear_compatibility(seq_marker marker, const subject& sub) {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));
auto& markers = sub_it->second.written_at;
markers.erase(
std::remove_if(
markers.begin(),
markers.end(),
[marker](auto sm) {
return sm.key_type == marker.key_type && sm.seq && marker.seq
&& *sm.seq == *marker.seq;
}),
markers.end());
return std::exchange(sub_it->second.compatibility, std::nullopt)
!= std::nullopt;
}
Expand Down
7 changes: 4 additions & 3 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) {
BOOST_REQUIRE(s.get_compatibility().value() == global_expected);

// Clearing compatibility should fallback to global
BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true);
BOOST_REQUIRE(
s.clear_compatibility(dummy_marker, subject0).value() == true);
BOOST_REQUIRE(
s.get_compatibility(subject0, fallback).value() == global_expected);
}
Expand Down Expand Up @@ -584,7 +585,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {
d_res.error().code(), pps::error_code::subject_soft_deleted);

// Clearing the compatibility of a soft-deleted subject is allowed
BOOST_REQUIRE(s.clear_compatibility(subject0).has_value());
BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value());

v_res = s.get_versions(subject0, pps::include_deleted::yes);
BOOST_REQUIRE(v_res.has_value());
Expand Down Expand Up @@ -622,7 +623,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {

// Clearing the compatibility of a hard-deleted subject should fail
BOOST_REQUIRE(
s.clear_compatibility(subject0).error().code()
s.clear_compatibility(dummy_marker, subject0).error().code()
== pps::error_code::subject_not_found);
}

Expand Down

0 comments on commit 82f4010

Please sign in to comment.