From 82f4010e6662621dbd1a7d6c0053cd13cd856111 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 17 Oct 2023 16:06:16 -0700 Subject: [PATCH] Proposed (naive) change for cleaning up sequence markers --- src/v/pandaproxy/schema_registry/sharded_store.cc | 7 ++++--- src/v/pandaproxy/schema_registry/sharded_store.h | 2 +- src/v/pandaproxy/schema_registry/storage.h | 8 +++++++- src/v/pandaproxy/schema_registry/store.h | 12 +++++++++++- src/v/pandaproxy/schema_registry/test/store.cc | 7 ++++--- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 7d2192b5ea96..67e15dc180a1 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -519,11 +519,12 @@ ss::future sharded_store::set_compatibility( }); } -ss::future sharded_store::clear_compatibility(subject sub) { +ss::future +sharded_store::clear_compatibility(seq_marker marker, subject sub) { auto sub_shard{shard_for(sub)}; co_return co_await _store.invoke_on( - sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) { - return s.clear_compatibility(sub).value(); + sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) { + return s.clear_compatibility(marker, sub).value(); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 6802365aed74..c6473f0e6bc6 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -131,7 +131,7 @@ class sharded_store { seq_marker marker, subject sub, compatibility_level compatibility); ///\brief Clear the compatibility level for a subject. - ss::future clear_compatibility(subject sub); + ss::future clear_compatibility(seq_marker marker, subject sub); ///\brief Check if the provided schema is compatible with the subject and /// version, according the the current compatibility. diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index 38aa1121930d..ff831df9778a 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -1287,7 +1287,13 @@ struct consume_to_store { try { vlog(plog.debug, "Applying: {}", key); if (!val) { - co_await _store.clear_compatibility(*key.sub); + co_await _store.clear_compatibility( + seq_marker{ + .seq = key.seq, + .node = key.node, + .version{invalid_schema_version}, // Not applicable + .key_type = seq_marker_key_type::config}, + *key.sub); } else if (key.sub) { co_await _store.set_compatibility( seq_marker{ diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 700c45aee836..28e87e00507c 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -494,9 +494,19 @@ class store { } ///\brief Clear the compatibility level for a subject. - result clear_compatibility(const subject& sub) { + result clear_compatibility(seq_marker marker, const subject& sub) { auto sub_it = BOOST_OUTCOME_TRYX( get_subject_iter(sub, include_deleted::yes)); + auto& markers = sub_it->second.written_at; + markers.erase( + std::remove_if( + markers.begin(), + markers.end(), + [marker](auto sm) { + return sm.key_type == marker.key_type && sm.seq && marker.seq + && *sm.seq == *marker.seq; + }), + markers.end()); return std::exchange(sub_it->second.compatibility, std::nullopt) != std::nullopt; } diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index 37dedae53745..483c0413a8b4 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -471,7 +471,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) { BOOST_REQUIRE(s.get_compatibility().value() == global_expected); // Clearing compatibility should fallback to global - BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true); + BOOST_REQUIRE( + s.clear_compatibility(dummy_marker, subject0).value() == true); BOOST_REQUIRE( s.get_compatibility(subject0, fallback).value() == global_expected); } @@ -584,7 +585,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { d_res.error().code(), pps::error_code::subject_soft_deleted); // Clearing the compatibility of a soft-deleted subject is allowed - BOOST_REQUIRE(s.clear_compatibility(subject0).has_value()); + BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value()); v_res = s.get_versions(subject0, pps::include_deleted::yes); BOOST_REQUIRE(v_res.has_value()); @@ -622,7 +623,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) { // Clearing the compatibility of a hard-deleted subject should fail BOOST_REQUIRE( - s.clear_compatibility(subject0).error().code() + s.clear_compatibility(dummy_marker, subject0).error().code() == pps::error_code::subject_not_found); }