Skip to content

Commit

Permalink
admin: add per partition anomalies endpoint
Browse files Browse the repository at this point in the history
This commit introduces a new endpoint to the admin API
/v1/cloud_storage/anomalies/{namespace}/{topic}/{partition} which allows
for the retrieval of anomalies detected by the cloud storage scrubber.
  • Loading branch information
Vlad Lazar committed Sep 15, 2023
1 parent 8773921 commit efd9f25
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/v/cluster/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,15 @@ partition_cloud_storage_status partition::get_cloud_storage_status() const {
return status;
}

std::optional<cloud_storage::anomalies>
partition::get_cloud_storage_anomalies() const {
if (!_archival_meta_stm || !is_leader()) {
return std::nullopt;
}

return _archival_meta_stm->manifest().detected_anomalies();
}

bool partition::is_remote_fetch_enabled() const {
const auto& cfg = _raft->log_config();
if (_feature_table.local().is_active(features::feature::cloud_retention)) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class partition {

partition_cloud_storage_status get_cloud_storage_status() const;

std::optional<cloud_storage::anomalies> get_cloud_storage_anomalies() const;

/// Return true if shadow indexing is enabled for the partition
bool is_remote_fetch_enabled() const;

Expand Down
73 changes: 73 additions & 0 deletions src/v/redpanda/admin/api-doc/shadow_indexing.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,47 @@
]
}
]
},
{
"path": "/v1/cloud_storage/anomalies/{namespace}/{topic}/{partition}",
"operations": [
{
"method": "GET",
"summary": "Retrieve cloud storage anomalies for a given partition",
"operationId": "get_cloud_storage_anomalies",
"nickname": "get_cloud_storage_anomalies",
"type": "cloud_storage_partition_anomalies",
"parameters": [
{
"name": "namespace",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "topic",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "partition",
"in": "path",
"required": true,
"type": "integer"
}
],
"produces": [
"application/json"
],
"responseMessages": [
{
"code": 200,
"message": "Success"
}
]
}
]
}
],
"models": {
Expand Down Expand Up @@ -347,6 +388,38 @@
"items": {"type": "lifecycle_marker"}
}
}
},
"cloud_storage_partition_anomalies": {
"id": "cloud_storage_partition_anomalies",
"description": "Anomalies detected by the cloud storage scrubber",
"properties": {
"ns": {
"type": "string"
},
"topic": {
"type": "string"
},
"partition": {
"type": "long"
},
"revision_id": {
"type": "long"
},
"missing_partition_manifest": {
"type": "boolean",
"nullable": true
},
"missing_spillover_manifests": {
"type": "array",
"items": {"type": "string"},
"nullable": true
},
"missing_segments": {
"type": "array",
"items": {"type": "string"},
"nullable": true
}
}
}
}
}
88 changes: 88 additions & 0 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_storage/cache_service.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote_partition.h"
#include "cloud_storage/spillover_manifest.h"
#include "cluster/cloud_storage_size_reducer.h"
#include "cluster/cluster_utils.h"
#include "cluster/config_frontend.h"
Expand Down Expand Up @@ -5099,6 +5100,44 @@ map_status_to_json(cluster::partition_cloud_storage_status status) {

return json;
}
ss::httpd::shadow_indexing_json::cloud_storage_partition_anomalies
map_anomalies_to_json(
const model::ntp& ntp,
const model::initial_revision_id& initial_rev,
const cloud_storage::anomalies& detected) {
ss::httpd::shadow_indexing_json::cloud_storage_partition_anomalies json;
json.ns = ntp.ns();
json.topic = ntp.tp.topic();
json.partition = ntp.tp.partition();
json.revision_id = initial_rev();

if (detected.missing_partition_manifest) {
json.missing_partition_manifest = true;
}

if (detected.missing_spillover_manifests.size() > 0) {
const auto& missing_spills = detected.missing_spillover_manifests;
for (auto iter = missing_spills.begin(); iter != missing_spills.end();
++iter) {
json.missing_spillover_manifests.push(
cloud_storage::generate_spillover_manifest_path(
ntp, initial_rev, *iter)()
.string());
}
}

if (detected.missing_segments.size() > 0) {
cloud_storage::partition_manifest tmp{ntp, initial_rev};
const auto& missing_segs = detected.missing_segments;
for (auto iter = missing_segs.begin(); iter != missing_segs.end();
++iter) {
json.missing_segments.push(
tmp.generate_segment_path(*iter)().string());
}
}

return json;
}
} // namespace

ss::future<ss::json::json_return_type>
Expand Down Expand Up @@ -5317,6 +5356,51 @@ ss::future<std::unique_ptr<ss::http::reply>> admin_server::get_manifest(
});
}

ss::future<ss::json::json_return_type>
admin_server::get_cloud_storage_anomalies(
std::unique_ptr<ss::http::request> req) {
const model::ntp ntp = parse_ntp_from_request(req->param);

if (need_redirect_to_leader(ntp, _metadata_cache)) {
throw co_await redirect_to_leader(*req, ntp);
}

const auto& topic_table = _controller->get_topics_state().local();
const auto initial_rev = topic_table.get_initial_revision(ntp);
if (!initial_rev) {
throw ss::httpd::not_found_exception(
fmt::format("topic {} not found", ntp.tp));
}

const auto shard = _shard_table.local().shard_for(ntp);
if (!shard) {
throw ss::httpd::not_found_exception(fmt::format(
"{} could not be found on the node. Perhaps it has been moved "
"during the redirect.",
ntp));
}

auto status = co_await _partition_manager.invoke_on(
*shard,
[&ntp](const auto& pm) -> std::optional<cloud_storage::anomalies> {
const auto& partitions = pm.partitions();
auto partition_iter = partitions.find(ntp);

if (partition_iter == partitions.end()) {
return std::nullopt;
}

return partition_iter->second->get_cloud_storage_anomalies();
});

if (!status) {
throw ss::httpd::not_found_exception(fmt::format(
"Cloud partition {} could not be found on shard {}.", ntp, *shard));
}

co_return map_anomalies_to_json(ntp, *initial_rev, *status);
}

void admin_server::register_shadow_indexing_routes() {
register_route<superuser>(
ss::httpd::shadow_indexing_json::sync_local_state,
Expand Down Expand Up @@ -5366,6 +5450,10 @@ void admin_server::register_shadow_indexing_routes() {
std::unique_ptr<ss::http::reply> rep) {
return get_manifest(std::move(req), std::move(rep));
});

register_route<user>(
ss::httpd::shadow_indexing_json::get_cloud_storage_anomalies,
[this](auto req) { return get_cloud_storage_anomalies(std::move(req)); });
}

constexpr std::string_view to_string_view(service_kind kind) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class admin_server {
ss::future<std::unique_ptr<ss::http::reply>> get_manifest(
std::unique_ptr<ss::http::request> req,
std::unique_ptr<ss::http::reply> rep);
ss::future<ss::json::json_return_type>
get_cloud_storage_anomalies(std::unique_ptr<ss::http::request>);

/// Self test routes
ss::future<ss::json::json_return_type>
Expand Down

0 comments on commit efd9f25

Please sign in to comment.