diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 4741806e7c..00345e21e8 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -113,6 +113,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_NOTIFY_STOP_SPLIT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL #define CURRENT_THREAD_POOL THREAD_POOL_META_STATE diff --git a/src/common/backup.thrift b/src/common/backup.thrift index 4edf030292..993115d497 100644 --- a/src/common/backup.thrift +++ b/src/common/backup.thrift @@ -154,3 +154,19 @@ struct configuration_query_restore_response 2:list restore_status; 3:list restore_progress; } + +struct start_backup_app_request +{ + 1:string backup_provider_type; + 2:i32 app_id; +} + +struct start_backup_app_response +{ + // Possible error: + // - ERR_INVALID_STATE: app is not available or is backing up + // - ERR_INVALID_PARAMETERS: backup provider type is invalid + // - ERR_SERVICE_NOT_ACTIVE: meta doesn't enable backup service + 1:dsn.error_code err; + 2:string hint_message; +} diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp new file mode 100644 index 0000000000..d0c61101af --- /dev/null +++ b/src/meta/backup_engine.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "common/backup_utils.h" +#include "server_state.h" + +namespace dsn { +namespace replication { + +backup_engine::backup_engine(backup_service *service) + : _backup_service(service), _block_service(nullptr), is_backup_failed(false) +{ +} + +backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); } + +error_code backup_engine::get_app_stat(int32_t app_id, std::shared_ptr &app) +{ + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + app = _backup_service->get_state()->get_app(app_id); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("app {} is not available, couldn't do backup now.", app_id); + return ERR_INVALID_STATE; + } + return ERR_OK; +} + +error_code backup_engine::init_backup(int32_t app_id) +{ + std::shared_ptr app; + error_code err = get_app_stat(app_id, app); + if (err != ERR_OK) { + return err; + } + + zauto_lock lock(_lock); + _backup_status.clear(); + for (int i = 0; i < app->partition_count; ++i) { + _backup_status.emplace(i, backup_status::UNALIVE); + } + _cur_backup.app_id = app_id; + _cur_backup.app_name = app->app_name; + _cur_backup.backup_id = static_cast(dsn_now_ms()); + _cur_backup.start_time_ms = _cur_backup.backup_id; + return ERR_OK; +} + +error_code backup_engine::set_block_service(const std::string &provider) +{ + _provider_type = provider; + _block_service = _backup_service->get_meta_service() + ->get_block_service_manager() + .get_or_create_block_filesystem(provider); + if (_block_service == nullptr) { + return ERR_INVALID_PARAMETERS; + } + return ERR_OK; +} + +error_code backup_engine::start() { return ERR_OK; } + +bool backup_engine::is_backing_up() +{ + zauto_lock l(_lock); + return _cur_backup.end_time_ms == 0 && !is_backup_failed; +} + +} // namespace replication +} // namespace dsn diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h new file mode 100644 index 0000000000..bb81304d46 --- /dev/null +++ b/src/meta/backup_engine.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +namespace dsn { +namespace replication { + +enum backup_status +{ + UNALIVE = 1, + ALIVE = 2, + COMPLETED = 3, + FAILED = 4 +}; + +struct app_backup_info +{ + int64_t backup_id; + int64_t start_time_ms; + int64_t end_time_ms; + + int32_t app_id; + std::string app_name; + + app_backup_info() : backup_id(0), start_time_ms(0), end_time_ms(0) {} + + DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id, app_name) +}; + +class app_state; +class backup_service; + +class backup_engine +{ +public: + backup_engine(backup_service *service); + ~backup_engine(); + + error_code init_backup(int32_t app_id); + error_code set_block_service(const std::string &provider); + + error_code start(); + + int64_t get_current_backup_id() const { return _cur_backup.backup_id; } + int32_t get_backup_app_id() const { return _cur_backup.app_id; } + bool is_backing_up(); + +private: + error_code get_app_stat(int32_t app_id, std::shared_ptr &app); + + backup_service *_backup_service; + dist::block_service::block_filesystem *_block_service; + std::string _provider_type; + dsn::task_tracker _tracker; + + // lock the following variables. + dsn::zlock _lock; + bool is_backup_failed; + app_backup_info _cur_backup; + // partition_id -> backup_status + std::map _backup_status; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp index 82d615c14f..ab54852b7a 100644 --- a/src/meta/meta_backup_service.cpp +++ b/src/meta/meta_backup_service.cpp @@ -1569,5 +1569,45 @@ std::string backup_service::get_backup_path(const std::string &policy_name, int6 ss << _policy_meta_root << "/" << policy_name << "/" << backup_id; return ss.str(); } + +void backup_service::start_backup_app(start_backup_app_rpc rpc) +{ + const start_backup_app_request &request = rpc.request(); + start_backup_app_response &response = rpc.response(); + + int32_t app_id = request.app_id; + std::shared_ptr engine = std::make_shared(this); + error_code err = engine->init_backup(app_id); + if (err != ERR_OK) { + response.err = err; + response.hint_message = fmt::format("Backup failed: invalid app id {}.", app_id); + return; + } + + err = engine->set_block_service(request.backup_provider_type); + if (err != ERR_OK) { + response.err = err; + response.hint_message = fmt::format("Backup failed: invalid backup_provider_type {}.", + request.backup_provider_type); + return; + } + + for (const auto &it : _backup_states) { + const auto &tmp_engine = it.second; + if (app_id == tmp_engine->get_backup_app_id() && tmp_engine->is_backing_up()) { + response.err = ERR_INVALID_STATE; + response.hint_message = fmt::format("Backup failed: app {} is backing up now.", app_id); + return; + } + } + + err = engine->run(); + if (err == ERR_OK) { + int64_t backup_id = engine->get_current_backup_id(); + _backup_states.emplace(backup_id, std::move(engine)); + } + response.err = err; +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h index efaab080cc..731e99a57e 100644 --- a/src/meta/meta_backup_service.h +++ b/src/meta/meta_backup_service.h @@ -27,6 +27,7 @@ #include #include +#include "backup_engine.h" #include "meta_data.h" namespace dsn { @@ -42,6 +43,7 @@ typedef rpc_holder configuration_modify_backup_policy_rpc; +typedef rpc_holder start_backup_app_rpc; struct backup_info_status { @@ -339,6 +341,7 @@ class backup_service void add_backup_policy(dsn::message_ex* msg); void query_backup_policy(query_backup_policy_rpc rpc); void modify_backup_policy(configuration_modify_backup_policy_rpc rpc); + void start_backup_app(start_backup_app_rpc rpc); // compose the absolute path(AP) for policy // input: @@ -375,10 +378,12 @@ class backup_service meta_service *_meta_svc; server_state *_state; - // _lock is only used to lock _policy_states + // lock _policy_states and _backup_states. zlock _lock; std::map> _policy_states; // policy_name -> policy_context + // backup_id -> backup_engine + std::unordered_map> _backup_states; // the root of policy metas, stored on remote_storage(zookeeper) std::string _policy_meta_root; diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index 753707ecb4..65bd21eb79 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -489,6 +489,8 @@ void meta_service::register_rpc_handlers() register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_BULK_LOAD_STATUS, "query_bulk_load_status", &meta_service::on_query_bulk_load_status); + register_rpc_handler_with_rpc_holder( + RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app); } int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address) @@ -1123,5 +1125,18 @@ void meta_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) _bulk_load_svc->on_query_bulk_load_status(std::move(rpc)); } +void meta_service::on_start_backup_app(start_backup_app_rpc rpc) +{ + if (!check_status(rpc)) { + return; + } + if (_backup_handler == nullptr) { + derror_f("meta doesn't enable backup service"); + rpc.response().err = ERR_SERVICE_NOT_ACTIVE; + return; + } + _backup_handler->start_backup_app(std::move(rpc)); +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index 02f71863db..d48231ebe6 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -165,6 +165,9 @@ class meta_service : public serverlet // meta control void on_control_meta_level(configuration_meta_control_rpc rpc); void on_start_recovery(configuration_recovery_rpc rpc); + + // backup/restore + void on_start_backup_app(start_backup_app_rpc rpc); void on_start_restore(dsn::message_ex *req); void on_add_backup_policy(dsn::message_ex *req); void on_query_backup_policy(query_backup_policy_rpc policy_rpc);