diff --git a/src/v/redpanda/CMakeLists.txt b/src/v/redpanda/CMakeLists.txt index 7b4ea51e6212..539f0bfdf55b 100644 --- a/src/v/redpanda/CMakeLists.txt +++ b/src/v/redpanda/CMakeLists.txt @@ -54,6 +54,7 @@ v_cc_library( admin/kafka.cc admin/util.cc admin/migrations.cc + admin/topics.cc admin/data_migration_utils.cc cli_parser.cc application.cc diff --git a/src/v/redpanda/admin/BUILD b/src/v/redpanda/admin/BUILD index 4b5caa3f7a59..4650ce9ca775 100644 --- a/src/v/redpanda/admin/BUILD +++ b/src/v/redpanda/admin/BUILD @@ -143,6 +143,7 @@ redpanda_cc_library( "recovery.cc", "security.cc", "server.cc", + "topics.cc", "transaction.cc", "transform.cc", "usage.cc", diff --git a/src/v/redpanda/admin/server.cc b/src/v/redpanda/admin/server.cc index ba18927bee91..a9a2c9a552c6 100644 --- a/src/v/redpanda/admin/server.cc +++ b/src/v/redpanda/admin/server.cc @@ -408,6 +408,7 @@ void admin_server::configure_admin_routes() { register_shadow_indexing_routes(); register_wasm_transform_routes(); register_data_migration_routes(); + register_topic_routes(); /** * Special REST apis active only in recovery mode */ diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index 35efc04a5f09..78180127dd40 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -431,6 +431,7 @@ class admin_server { void register_wasm_transform_routes(); void register_recovery_mode_routes(); void register_data_migration_routes(); + void register_topic_routes(); ss::future patch_cluster_config_handler( std::unique_ptr, const request_auth_result&); @@ -656,6 +657,12 @@ class admin_server { ss::future delete_migration(std::unique_ptr); + // Topic routes + ss::future + mount_topics(std::unique_ptr); + ss::future + unmount_topics(std::unique_ptr); + ss::future<> throw_on_error( ss::http::request& req, std::error_code ec, diff --git a/src/v/redpanda/admin/topics.cc b/src/v/redpanda/admin/topics.cc new file mode 100644 index 000000000000..1fd84b74bc2e --- /dev/null +++ b/src/v/redpanda/admin/topics.cc @@ -0,0 +1,190 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "base/vlog.h" +#include "cluster/controller.h" +#include "cluster/data_migration_frontend.h" +#include "cluster/data_migration_types.h" +#include "container/fragmented_vector.h" +#include "json/validator.h" +#include "redpanda/admin/api-doc/migration.json.hh" +#include "redpanda/admin/data_migration_utils.h" +#include "redpanda/admin/server.h" +#include "redpanda/admin/util.h" + +using admin::apply_validator; + +namespace { + +json::validator make_mount_configuration_validator() { + const std::string schema = R"( + +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "required": [ + "topics" + ], + "properties": { + "topics": { + "type": "array", + "items": { + "$ref": "#/definitions/inbound_topic" + }, + "description": "List of topics to mount" + } + }, + "definitions": { + "namespaced_topic": { + "type": "object", + "required": [ + "topic" + ], + "properties": { + "topic": { + "type": "string" + }, + "ns": { + "type": "string" + } + }, + "additionalProperties": false + }, + "inbound_topic": { + "type": "object", + "required": [ + "source_topic" + ], + "properties": { + "source_topic": { + "$ref": "#/definitions/namespaced_topic" + }, + "alias": { + "$ref": "#/definitions/namespaced_topic" + }, + "location": { + "type": "string" + } + }, + "additionalProperties": false + } + } +})"; + return json::validator(schema); +} + +json::validator make_unmount_array_validator() { + const std::string schema = R"( +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "required": [ + "topics" + ], + "properties": { + "topics": { + "type": "array", + "items": { + "$ref": "#/definitions/namespaced_topic" + }, + "description": "List of topics to unmount" + } + }, + "definitions": { + "namespaced_topic": { + "type": "object", + "required": [ + "topic" + ], + "properties": { + "topic": { + "type": "string" + }, + "ns": { + "type": "string" + } + }, + "additionalProperties": false + } + } +})"; + return json::validator(schema); +} + +} // namespace + +void admin_server::register_topic_routes() { + register_route( + ss::httpd::migration_json::mount_topics, + [this](std::unique_ptr req) { + return mount_topics(std::move(req)); + }); + register_route( + ss::httpd::migration_json::unmount_topics, + [this](std::unique_ptr req) { + return unmount_topics(std::move(req)); + }); +} + +ss::future +admin_server::mount_topics(std::unique_ptr req) { + static thread_local json::validator validator + = make_mount_configuration_validator(); + auto json_doc = co_await parse_json_body(req.get()); + apply_validator(validator, json_doc); + cluster::data_migrations::inbound_migration migration; + + migration.topics = parse_inbound_topics(json_doc); + auto result = co_await _controller->get_data_migration_frontend() + .local() + .create_migration(std::move(migration)); + if (!result) { + vlog( + adminlog.warn, + "unable to create data migration for topic mount - error: {}", + result.error()); + co_await throw_on_error(*req, result.error(), model::controller_ntp); + throw ss::httpd::server_error_exception( + "unknown error when creating data migration for mounting topics"); + } + + ss::httpd::migration_json::migration_info reply; + reply.id = result.value(); + co_return std::move(reply); +} + +ss::future +admin_server::unmount_topics(std::unique_ptr req) { + static thread_local json::validator validator + = make_unmount_array_validator(); + auto json_doc = co_await parse_json_body(req.get()); + apply_validator(validator, json_doc); + cluster::data_migrations::outbound_migration migration; + + migration.topics = parse_topics(json_doc); + auto result = co_await _controller->get_data_migration_frontend() + .local() + .create_migration(std::move(migration)); + if (!result) { + vlog( + adminlog.warn, + "unable to create data migration for topic unmount - error: {}", + result.error()); + co_await throw_on_error(*req, result.error(), model::controller_ntp); + throw ss::httpd::server_error_exception( + "unknown error when creating data migration for unmounting topics"); + } + + ss::httpd::migration_json::migration_info reply; + reply.id = result.value(); + co_return std::move(reply); +}