Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema_registry: Add support for DELETE /config/{subject} #13557

Merged
merged 4 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,49 @@
}
}
}
},
"delete": {
"summary": "Delete the compatibility level for a subject.",
"operationId": "delete_config_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
BenPope marked this conversation as resolved.
Show resolved Hide resolved
"parameters": [
{
"name": "subject",
"in": "path",
"required": true,
"type": "string"
}
],
"produces": ["application/vnd.schemaregistry.v1+json"],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"properties": {
"compatibility": {
"type": "string"
}
}
}
},
"404": {
"description": "Subject not found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
BenPope marked this conversation as resolved.
Show resolved Hide resolved
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/config": {
Expand Down
31 changes: 31 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "pandaproxy/parsing/httpd.h"
#include "pandaproxy/reply.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/requests/compatibility.h"
#include "pandaproxy/schema_registry/requests/config.h"
#include "pandaproxy/schema_registry/requests/get_schemas_ids_id.h"
Expand Down Expand Up @@ -175,6 +176,36 @@ put_config_subject(server::request_t rq, server::reply_t rp) {
co_return rp;
}

ss::future<server::reply_t>
delete_config_subject(server::request_t rq, server::reply_t rp) {
parse_content_type_header(rq);
BenPope marked this conversation as resolved.
Show resolved Hide resolved
parse_accept_header(rq, rp);
auto sub = parse::request_param<subject>(*rq.req, "subject");

rq.req.reset();

// ensure we see latest writes
co_await rq.service().writer().read_sync();

compatibility_level lvl{};
try {
lvl = co_await rq.service().schema_store().get_compatibility(
sub, default_to_global::no);
} catch (const exception& e) {
if (e.code() == error_code::compatibility_not_found) {
throw as_exception(not_found(sub));
} else {
throw;
}
}

co_await rq.service().writer().delete_config(sub);

auto json_rslt = ppj::rjson_serialize(get_config_req_rep{.compat = lvl});
BenPope marked this conversation as resolved.
Show resolved Hide resolved
rp.rep->write_body("json", json_rslt);
co_return rp;
}

ss::future<server::reply_t> get_mode(server::request_t rq, server::reply_t rp) {
parse_accept_header(rq, rp);
rq.req.reset();
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ ss::future<ctx_server<service>::reply_t> get_config_subject(
ss::future<ctx_server<service>::reply_t> put_config_subject(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t> delete_config_subject(
ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

ss::future<ctx_server<service>::reply_t>
get_mode(ctx_server<service>::request_t rq, ctx_server<service>::reply_t rp);

Expand Down
67 changes: 67 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,73 @@ ss::future<bool> seq_writer::write_config(
});
}

ss::future<std::optional<bool>> seq_writer::do_delete_config(
subject sub, model::offset write_at, seq_writer& seq) {
vlog(plog.debug, "delete config sub={} offset={}", sub, write_at);

try {
co_await seq._store.get_compatibility(sub, default_to_global::no);
} catch (const exception&) {
// subject config already blank
co_return false;
}

std::vector<seq_marker> sequences{
co_await _store.get_subject_config_written_at(sub)};

storage::record_batch_builder rb{
model::record_batch_type::raft_data, model::offset{0}};

std::vector<config_key> keys;
for (const auto& s : sequences) {
vlog(
plog.debug,
"Deleting config: tombstoning config_key for sub={} at {}",
sub,
s);

vassert(
s.key_type == seq_marker_key_type::config,
"Unexpected key type: {}",
s.key_type);

auto key = config_key{.seq{s.seq}, .node{s.node}, .sub{sub}};
keys.push_back(key);
rb.add_raw_kv(to_json_iobuf(std::move(key)), std::nullopt);
}

auto ts_batch = std::move(rb).build();
kafka::partition_produce_response res
= co_await _client.local().produce_record_batch(
model::schema_registry_internal_tp, std::move(ts_batch));

if (res.error_code != kafka::error_code::none) {
vlog(
plog.error,
"Error writing to subject topic: {} {}",
res.error_code,
res.error_message);
throw kafka::exception(res.error_code, *res.error_message);
}

auto applier = consume_to_store(seq._store, seq);
auto offset = res.base_offset;
for (const auto& k : keys) {
co_await applier.apply(offset, k, std::nullopt);
seq.advance_offset_inner(offset);
++offset;
}

co_return true;
}

ss::future<bool> seq_writer::delete_config(subject sub) {
return sequenced_write(
[this, sub{std::move(sub)}](model::offset write_at, seq_writer& seq) {
return do_delete_config(sub, write_at, seq);
});
}

/// Impermanent delete: update a version with is_deleted=true
ss::future<std::optional<bool>> seq_writer::do_delete_subject_version(
subject sub,
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/seq_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
ss::future<bool>
write_config(std::optional<subject> sub, compatibility_level compat);

ss::future<bool> delete_config(subject sub);

ss::future<bool>
delete_subject_version(subject sub, schema_version version);

Expand Down Expand Up @@ -75,6 +77,9 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
model::offset write_at,
seq_writer& seq);

ss::future<std::optional<bool>>
do_delete_config(subject sub, model::offset write_at, seq_writer& seq);

ss::future<std::optional<bool>> do_delete_subject_version(
subject sub,
schema_version version,
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ server::routes_t get_schema_registry_routes(ss::gate& gate, one_shot& es) {
ss::httpd::schema_registry_json::put_config_subject,
wrap(gate, es, put_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::delete_config_subject,
wrap(gate, es, delete_config_subject)});

routes.routes.emplace_back(server::route_t{
ss::httpd::schema_registry_json::get_mode, wrap(gate, es, get_mode)});

Expand Down
16 changes: 13 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,15 @@ sharded_store::get_subject_written_at(subject sub) {
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_config_written_at(subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.store::get_subject_config_written_at(sub).value();
});
}

ss::future<std::vector<seq_marker>>
sharded_store::get_subject_version_written_at(subject sub, schema_version ver) {
auto sub_shard{shard_for(sub)};
Expand Down Expand Up @@ -510,11 +519,12 @@ ss::future<bool> sharded_store::set_compatibility(
});
}

ss::future<bool> sharded_store::clear_compatibility(subject sub) {
ss::future<bool>
sharded_store::clear_compatibility(seq_marker marker, subject sub) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard, _smp_opts, [sub{std::move(sub)}](store& s) {
return s.clear_compatibility(sub).value();
sub_shard, _smp_opts, [marker, sub{std::move(sub)}](store& s) {
return s.clear_compatibility(marker, sub).value();
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class sharded_store {
///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>> get_subject_written_at(subject sub);

///\brief Get sequence number history of subject config. Subject need
/// not be soft-deleted first
ss::future<std::vector<seq_marker>>
get_subject_config_written_at(subject sub);

///\brief Get sequence number history (errors out if not soft-deleted)
ss::future<std::vector<seq_marker>>
get_subject_version_written_at(subject sub, schema_version version);
Expand All @@ -126,7 +131,7 @@ class sharded_store {
seq_marker marker, subject sub, compatibility_level compatibility);

///\brief Clear the compatibility level for a subject.
ss::future<bool> clear_compatibility(subject sub);
ss::future<bool> clear_compatibility(seq_marker marker, subject sub);

///\brief Check if the provided schema is compatible with the subject and
/// version, according the the current compatibility.
Expand Down
36 changes: 24 additions & 12 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1286,19 +1286,31 @@ struct consume_to_store {
}
try {
vlog(plog.debug, "Applying: {}", key);
if (!val) {
co_await _store.clear_compatibility(*key.sub);
} else if (key.sub) {
co_await _store.set_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub,
val->compat);
} else {
if (key.sub.has_value()) {
if (!val.has_value()) {
co_await _store.clear_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub);
} else {
co_await _store.set_compatibility(
seq_marker{
.seq = key.seq,
.node = key.node,
.version{invalid_schema_version}, // Not applicable
.key_type = seq_marker_key_type::config},
*key.sub,
val->compat);
}
} else if (val.has_value()) {
co_await _store.set_compatibility(val->compat);
} else {
vlog(
plog.warn,
"Tried to apply config with neither subject nor value");
}
} catch (const exception& e) {
vlog(plog.debug, "Error replaying: {}: {}", key, e);
Expand Down
32 changes: 31 additions & 1 deletion src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,34 @@ class store {
}
}

/// \brief Return the seq_marker write history of a subject, but only
/// config_keys
///
/// \return A vector (possibly empty)
result<std::vector<seq_marker>>
get_subject_config_written_at(const subject& sub) const {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));

// This should never happen (how can a record get into the
// store without an originating sequenced record?), but return
// an error instead of vasserting out.
if (sub_it->second.written_at.empty()) {
return not_found(sub);
}
oleiman marked this conversation as resolved.
Show resolved Hide resolved

std::vector<seq_marker> result;
std::copy_if(
sub_it->second.written_at.begin(),
sub_it->second.written_at.end(),
std::back_inserter(result),
[](const auto& sm) {
return sm.key_type == seq_marker_key_type::config;
});

return result;
}

/// \brief Return the seq_marker write history of a version.
///
/// \return A vector with at least one element
Expand Down Expand Up @@ -466,9 +494,11 @@ class store {
}

///\brief Clear the compatibility level for a subject.
result<bool> clear_compatibility(const subject& sub) {
result<bool>
clear_compatibility(const seq_marker& marker, const subject& sub) {
auto sub_it = BOOST_OUTCOME_TRYX(
get_subject_iter(sub, include_deleted::yes));
std::erase(sub_it->second.written_at, marker);
return std::exchange(sub_it->second.compatibility, std::nullopt)
!= std::nullopt;
}
Expand Down
7 changes: 4 additions & 3 deletions src/v/pandaproxy/schema_registry/test/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ BOOST_AUTO_TEST_CASE(test_store_subject_compat) {
BOOST_REQUIRE(s.get_compatibility().value() == global_expected);

// Clearing compatibility should fallback to global
BOOST_REQUIRE(s.clear_compatibility(subject0).value() == true);
BOOST_REQUIRE(
s.clear_compatibility(dummy_marker, subject0).value() == true);
BOOST_REQUIRE(
s.get_compatibility(subject0, fallback).value() == global_expected);
}
Expand Down Expand Up @@ -584,7 +585,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {
d_res.error().code(), pps::error_code::subject_soft_deleted);

// Clearing the compatibility of a soft-deleted subject is allowed
BOOST_REQUIRE(s.clear_compatibility(subject0).has_value());
BOOST_REQUIRE(s.clear_compatibility(dummy_marker, subject0).has_value());

v_res = s.get_versions(subject0, pps::include_deleted::yes);
BOOST_REQUIRE(v_res.has_value());
Expand Down Expand Up @@ -622,7 +623,7 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject) {

// Clearing the compatibility of a hard-deleted subject should fail
BOOST_REQUIRE(
s.clear_compatibility(subject0).error().code()
s.clear_compatibility(dummy_marker, subject0).error().code()
== pps::error_code::subject_not_found);
}

Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ struct seq_marker {
schema_version version;
seq_marker_key_type key_type{seq_marker_key_type::invalid};

// Note that matching nullopts is possible on the seq and node fields.
// This is intentional; both fields are particular to redpanda, so making
// them optional provides compatibility with non-rp schema registries. If
// either is not present, we can assume a collision has not occurred.
friend bool operator==(const seq_marker&, const seq_marker&) = default;
friend std::ostream& operator<<(std::ostream& os, const seq_marker& v);
};

Expand Down
Loading
Loading