diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 47fd3078a8..00345e21e8 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -113,7 +113,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_NOTIFY_STOP_SPLIT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON) -MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APPS, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL #define CURRENT_THREAD_POOL THREAD_POOL_META_STATE diff --git a/src/common/backup.thrift b/src/common/backup.thrift index bec27212a2..f20b40b650 100644 --- a/src/common/backup.thrift +++ b/src/common/backup.thrift @@ -157,12 +157,12 @@ struct configuration_query_restore_response struct start_backup_apps_request { - 1:string backup_provider_type; - 2:set app_ids; + 1:string backup_provider_type; + 2:i32 app_id; } struct start_backup_apps_response { - 1:dsn.error_code err; - 2:string hint_message; + 1:dsn.error_code err; + 2:string hint_message; } diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp new file mode 100644 index 0000000000..f6a48354a2 --- /dev/null +++ b/src/meta/backup_engine.cpp @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "common/backup_utils.h" +#include "server_state.h" + +namespace dsn { +namespace replication { + +backup_engine::backup_engine(backup_service *service) + : _backup_service(service), _block_service(nullptr) +{ +} + +backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); } + +error_code backup_engine::init_backup(int32_t app_id) +{ + zauto_lock lock(_lock); + _backup_status.clear(); + + std::shared_ptr app; + { + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + app = _backup_service->get_state()->get_app(app_id); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("app {} is not available, couldn't do backup now.", app_id); + return ERR_INVALID_STATE; + } + } + for (int i = 0; i < app->partition_count; ++i) { + _backup_status.emplace(i, backup_status::UNALIVE); + } + + _cur_backup.backup_id = static_cast(dsn_now_ms()); + _cur_backup.start_time_ms = _cur_backup.backup_id; + return ERR_OK; +} + +error_code backup_engine::set_block_service(const std::string &provider) +{ + _provider_type = provider; + _block_service = _backup_service->get_meta_service() + ->get_block_service_manager() + .get_or_create_block_filesystem(provider); + if (_block_service == nullptr) { + return ERR_INVALID_PARAMETERS; + } + return ERR_OK; +} + +error_code backup_engine::write_backup_file(const std::string &file_name, + const dsn::blob &write_buffer) +{ + dist::block_service::create_file_request create_file_req; + create_file_req.ignore_metadata = true; + create_file_req.file_name = file_name; + + dsn::error_code err; + dist::block_service::block_file_ptr remote_file; + _block_service + ->create_file(create_file_req, + TASK_CODE_EXEC_INLINED, + [&err, &remote_file](const dist::block_service::create_file_response &resp) { + err = resp.err; + remote_file = resp.file_handle; + }) + ->wait(); + if (err != dsn::ERR_OK) { + ddebug_f("create file {} failed", file_name); + return err; + } + dassert_f(remote_file != nullptr, + "create file {} succeed, but can't get handle", + create_file_req.file_name); + + remote_file + ->write(dist::block_service::write_request{write_buffer}, + TASK_CODE_EXEC_INLINED, + [&err](const dist::block_service::write_response &resp) { err = resp.err; }) + ->wait(); + return err; +} + +error_code backup_engine::backup_app_meta() +{ + dsn::blob buffer; + { + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + const std::shared_ptr &app = + _backup_service->get_state()->get_app(_cur_backup.app_id); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + return ERR_INVALID_STATE; + } + // do not write app envs to backup file + app_state tmp = *app; + if (!tmp.envs.empty()) { + tmp.envs.clear(); + } + buffer = dsn::json::json_forwarder::encode(tmp); + } + std::string file_name = cold_backup::get_app_metadata_file(_backup_service->backup_root(), + _cur_backup.app_name, + _cur_backup.app_id, + _cur_backup.backup_id); + return write_backup_file(file_name, buffer); +} + +void backup_engine::backup_app_partition(const gpid &pid) +{ + dsn::rpc_address partition_primary; + { + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + const std::shared_ptr &app = + _backup_service->get_state()->get_app(pid.get_app_id()); + + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("backup_id({}): app {} is unavailable, could not do backups for it now.", + _cur_backup.backup_id, + pid.get_app_id()); + return; + } + partition_primary = app->partitions[pid.get_partition_index()].primary; + } + + if (partition_primary.is_invalid()) { + dwarn_f("backup_id({}): partition {} doesn't have a primary now, retry to backup it later.", + _cur_backup.backup_id, + pid.to_string()); + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(10)); + return; + } + + backup_request req; + req.pid = pid; + policy_info backup_policy_info; + backup_policy_info.__set_backup_provider_type(_provider_type); + backup_policy_info.__set_policy_name(get_policy_name()); + req.policy = backup_policy_info; + req.backup_id = _cur_backup.backup_id; + req.app_name = _cur_backup.app_name; + dsn::message_ex *request = + dsn::message_ex::create_request(RPC_COLD_BACKUP, 0, pid.thread_hash()); + dsn::marshall(request, req); + dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task( + request, + &_tracker, + [this, pid, partition_primary](error_code err, backup_response &&response) { + on_backup_reply(err, std::move(response), pid, partition_primary); + }); + ddebug_f("backup_id({}): send backup request to partition {}, target_addr = {}", + _cur_backup.backup_id, + pid.to_string(), + partition_primary.to_string()); + _backup_service->get_meta_service()->send_request(request, partition_primary, rpc_callback); + + zauto_lock l(_lock); + _backup_status[pid.get_partition_index()] = backup_status::ALIVE; +} + +error_code backup_engine::run() +{ + error_code err = backup_app_meta(); + if (err != ERR_OK) { + derror_f("backup_id({}): backup meta data for app {} failed, error {}", + _cur_backup.backup_id, + _cur_backup.app_id, + err.to_string()); + return err; + } + for (int i = 0; i < _backup_status.size(); ++i) { + tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() { + backup_app_partition(gpid(_cur_backup.app_id, i)); + }); + } + return ERR_OK; +} + +void backup_engine::on_backup_reply(error_code err, + backup_response &&response, + gpid pid, + const rpc_address &primary) +{ + ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " + "{}, response error {}.", + _cur_backup.backup_id, + pid.to_string(), + primary.to_string(), + err.to_string(), + response.err.to_string()); + dassert_f(response.pid == pid, + "backup partition id {} vs {} don't match", + response.pid.to_string(), + pid.to_string()); + dassert_f(response.backup_id == _cur_backup.backup_id, + "backup id {} vs {} don't match", + response.backup_id, + _cur_backup.backup_id); + + // if backup completed, receive ERR_OK; + // if backup failed, receive ERR_LOCAL_APP_FAILURE; + // receive ERR_BUSY or ERR_INVALID_STATE in other cases. + // see replica::on_cold_backup() for details. + if (_cur_backup.is_backup_failed) { + return; + } + int32_t partition = pid.get_partition_index(); + if (err == dsn::ERR_OK && response.err == dsn::ERR_OK && + response.progress == cold_backup_constant::PROGRESS_FINISHED) { + { + zauto_lock l(_lock); + _backup_status[partition] = backup_status::COMPLETED; + } + complete_current_backup(); + return; + } + if (response.err == ERR_LOCAL_APP_FAILURE) { + derror_f("backup_id({}): backup for partition {} failed.", + _cur_backup.backup_id, + pid.to_string()); + zauto_lock l(_lock); + _cur_backup.is_backup_failed = true; + _backup_status[partition] = backup_status::FAILED; + return; + } + + ddebug_f("backup_id({}): retry to send backup request for partition {}.", + _cur_backup.backup_id, + pid.to_string()); + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(1)); +} + +void backup_engine::complete_current_backup() +{ + zauto_lock l(_lock); + for (int i = 0; i < _backup_status.size(); ++i) { + if (_backup_status[i] != backup_status::COMPLETED) { + return; + } + } + + _cur_backup.end_time_ms = dsn_now_ms(); + std::string file_name = + cold_backup::get_backup_info_file(_backup_service->backup_root(), _cur_backup.backup_id); + blob buf = dsn::json::json_forwarder::encode(_cur_backup); + // TODO(zhangyifan): handle errors if writing backup info failed. + write_backup_file(file_name, buf); +} + +} // namespace replication +} // namespace dsn diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h new file mode 100644 index 0000000000..d85f544f17 --- /dev/null +++ b/src/meta/backup_engine.h @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +namespace dsn { +namespace replication { + +enum backup_status +{ + UNALIVE = 1, + ALIVE = 2, + COMPLETED = 3, + FAILED = 4 +}; + +struct app_backup_info +{ + int64_t backup_id; + int64_t start_time_ms; + int64_t end_time_ms; + + bool is_backup_failed; + + int32_t app_id; + std::string app_name; + + app_backup_info() : backup_id(0), start_time_ms(0), end_time_ms(0), is_backup_failed(false) {} + + DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id, app_name) +}; + +class backup_service; + +class backup_engine +{ +public: + backup_engine(backup_service *service); + ~backup_engine(); + + error_code init_backup(int32_t app_id); + error_code set_block_service(const std::string &provider); + + error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer); + error_code backup_app_meta(); + void backup_app_partition(const gpid &pid); + void on_backup_reply(dsn::error_code err, + backup_response &&response, + gpid pid, + const rpc_address &primary); + + error_code run(); + + // if all partitions have been backed up, write backup_info file. + void complete_current_backup(); + + int64_t get_current_backup_id() { return _cur_backup.backup_id; } + int32_t get_backup_app_id() { return _cur_backup.app_id; } + bool is_backing_up() { return _cur_backup.end_time_ms != 0 && !_cur_backup.is_backup_failed; } + +private: + std::string get_policy_name() { return "fake_policy_" + std::to_string(_cur_backup.backup_id); } + + backup_service *_backup_service; + dist::block_service::block_filesystem *_block_service; + std::string _provider_type; + dsn::task_tracker _tracker; + + // lock _cur_backup and _backup_status. + dsn::zlock _lock; + app_backup_info _cur_backup; + // partition_id -> backup_status + std::map _backup_status; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp index e9d748a1f4..d254909187 100644 --- a/src/meta/meta_backup_service.cpp +++ b/src/meta/meta_backup_service.cpp @@ -984,285 +984,6 @@ void policy_context::sync_remove_backup_info(const backup_info &info, dsn::task_ backup_info_path, true, LPC_DEFAULT_CALLBACK, callback, nullptr); } -backup_engine::backup_engine(backup_service *service) - : _backup_service(service), _block_service(nullptr) -{ -} - -backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); } - -error_code backup_engine::init_backups(const std::set &app_ids) -{ - zauto_lock lock(_lock); - _cur_backup.app_names.clear(); - _backup_status.clear(); - for (const auto &app_id : app_ids) { - std::shared_ptr app; - { - zauto_read_lock l; - _backup_service->get_state()->lock_read(l); - app = _backup_service->get_state()->get_app(app_id); - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - derror_f("app {} is not available, couldn't do backup now.", app_id); - continue; - } - } - for (int i = 0; i < app->partition_count; ++i) { - _backup_status[app_id][i] = backup_status::UNALIVE; - } - _cur_backup.app_names.emplace(app_id, app->app_name); - } - - if (_cur_backup.app_names.empty()) { - derror_f("all backup apps are unavailable."); - return ERR_INVALID_STATE; - } - _cur_backup.backup_id = static_cast(dsn_now_ms()); - _cur_backup.start_time_ms = _cur_backup.backup_id; - return ERR_OK; -} - -error_code backup_engine::set_block_service(const std::string &provider) -{ - _provider_type = provider; - _block_service = _backup_service->get_meta_service() - ->get_block_service_manager() - .get_or_create_block_filesystem(provider); - if (_block_service == nullptr) { - return ERR_INVALID_PARAMETERS; - } - return ERR_OK; -} - -error_code backup_engine::write_backup_file(const std::string &file_name, - const dsn::blob &write_buffer) -{ - dist::block_service::create_file_request create_file_req; - create_file_req.ignore_metadata = true; - create_file_req.file_name = file_name; - - dsn::error_code err; - dist::block_service::block_file_ptr remote_file; - _block_service - ->create_file(create_file_req, - TASK_CODE_EXEC_INLINED, - [&err, &remote_file](const dist::block_service::create_file_response &resp) { - err = resp.err; - remote_file = resp.file_handle; - }) - ->wait(); - if (err != dsn::ERR_OK) { - ddebug_f("create file {} failed", file_name); - return err; - } - dassert(remote_file != nullptr, - "create file(%s) succeed, but can't get handle", - create_file_req.file_name.c_str()); - - remote_file - ->write(dist::block_service::write_request{write_buffer}, - TASK_CODE_EXEC_INLINED, - [&err](const dist::block_service::write_response &resp) { err = resp.err; }) - ->wait(); - return err; -} - -error_code backup_engine::backup_app_meta(const std::string &app_name, int32_t app_id) -{ - dsn::blob buffer; - { - zauto_read_lock l; - _backup_service->get_state()->lock_read(l); - const std::shared_ptr &app = _backup_service->get_state()->get_app(app_id); - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - return ERR_INVALID_STATE; - } - // do not write app envs to backup file - app_state tmp = *app; - if (!tmp.envs.empty()) { - tmp.envs.clear(); - } - buffer = dsn::json::json_forwarder::encode(tmp); - } - std::string file_name = cold_backup::get_app_metadata_file( - _backup_service->backup_root(), app_name, app_id, _cur_backup.backup_id); - return write_backup_file(file_name, buffer); -} - -void backup_engine::backup_app_partition(const std::string &app_name, - const gpid &pid, - const std::string &policy_name) -{ - dsn::rpc_address partition_primary; - { - zauto_read_lock l; - _backup_service->get_state()->lock_read(l); - const std::shared_ptr &app = - _backup_service->get_state()->get_app(pid.get_app_id()); - - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - derror_f("backup_id({}): app {}({}) is unavailable, could not do backups for it now.", - _cur_backup.backup_id, - pid.get_app_id(), - app_name); - return; - } - partition_primary = app->partitions[pid.get_partition_index()].primary; - } - - if (partition_primary.is_invalid()) { - dwarn_f("backup_id({}): partition {} doesn't have a primary now, retry to backup it later.", - _cur_backup.backup_id, - pid.to_string()); - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, app_name, pid, policy_name]() { - backup_app_partition(app_name, pid, policy_name); - }, - 0, - std::chrono::seconds(10)); - return; - } - - backup_request req; - req.pid = pid; - policy_info backup_policy_info; - backup_policy_info.__set_backup_provider_type(_provider_type); - backup_policy_info.__set_policy_name(policy_name); - req.policy = backup_policy_info; - req.backup_id = _cur_backup.backup_id; - req.app_name = app_name; - dsn::message_ex *request = - dsn::message_ex::create_request(RPC_COLD_BACKUP, 0, pid.thread_hash()); - dsn::marshall(request, req); - dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task( - request, - &_tracker, - [this, pid, partition_primary](error_code err, backup_response &&response) { - on_backup_reply(err, std::move(response), pid, partition_primary); - }); - ddebug_f("backup_id({}): send backup request to partition {}, target_addr = {}", - _cur_backup.backup_id, - pid.to_string(), - partition_primary.to_string()); - _backup_service->get_meta_service()->send_request(request, partition_primary, rpc_callback); - - zauto_lock l(_lock); - _backup_status[pid.get_app_id()][pid.get_partition_index()] = backup_status::ALIVE; -} - -error_code backup_engine::run() -{ - for (const auto &app : _cur_backup.app_names) { - int32_t app_id = app.first; - const std::string &app_name = app.second; - error_code err = backup_app_meta(app_name, app_id); - if (err != ERR_OK) { - derror_f("backup_id({}): backup meta data for app {} {} failed, error {}", - _cur_backup.backup_id, - app_id, - app_name, - err.to_string()); - return err; - } - for (int i = 0; i < _backup_status[app_id].size(); ++i) { - tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, app_name, app_id, i]() { - backup_app_partition(app_name, gpid(app_id, i), get_policy_name()); - }); - } - } - return ERR_OK; -} - -void backup_engine::on_backup_reply(error_code err, - backup_response &&response, - gpid pid, - const rpc_address &primary) -{ - ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}.", - _cur_backup.backup_id, - pid.to_string(), - primary.to_string(), - err.to_string(), - response.err.to_string()); - dassert(response.pid == pid, - "backup partition id %s vs %s don't match", - response.pid.to_string(), - pid.to_string()); - dassert(response.backup_id == _cur_backup.backup_id, - "backup id %d vs %d don't match", - response.backup_id, - _cur_backup.backup_id); - - // if backup completed, receive ERR_OK; - // if backup failed, receive ERR_LOCAL_APP_FAILURE; - // receive ERR_BUSY or ERR_INVALID_STATE in other cases. - // see replica::on_cold_backup() for details. - int32_t app_id = pid.get_app_id(); - int32_t partition = pid.get_partition_index(); - if (err == dsn::ERR_OK && response.err == dsn::ERR_OK && - response.progress == cold_backup_constant::PROGRESS_FINISHED) { - { - zauto_lock l(_lock); - _backup_status[app_id][partition] = backup_status::COMPLETED; - } - std::string msg; - bool backup_completed = is_backup_complete(msg); - if (backup_completed) { - std::string file_name = cold_backup::get_backup_info_file( - _backup_service->backup_root(), _cur_backup.backup_id); - blob buf = dsn::json::json_forwarder::encode(_cur_backup); - // TODO(zhangyifan): handle errors if writing backup info failed. - write_backup_file(file_name, buf); - } - return; - } - if (response.err == ERR_LOCAL_APP_FAILURE) { - derror_f("backup_id({}): backup for partition {} failed.", - _cur_backup.backup_id, - pid.to_string()); - zauto_lock l(_lock); - _backup_status[app_id][partition] = backup_status::FAILED; - return; - } - - ddebug_f("backup_id({}): retry to send backup request for partition {}.", - _cur_backup.backup_id, - pid.to_string()); - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, pid]() { - backup_app_partition( - _cur_backup.app_names.at(pid.get_app_id()), pid, get_policy_name()); - }, - 0, - std::chrono::seconds(1)); -} - -bool backup_engine::is_backup_complete(std::string &hint_message) -{ - zauto_lock l(_lock); - for (const auto &it : _backup_status) { - int32_t app_id = it.first; - const auto &partitions = it.second; - for (int i = 0; i < partitions.size(); ++i) { - if (partitions.at(i) == backup_status::FAILED) { - hint_message = fmt::format("backup for paritition {}.{} failed.", app_id, i); - return false; - } - if (partitions.at(i) != backup_status::COMPLETED) { - hint_message = - fmt::format("backup for paritition {}.{} didn't complete.", app_id, i); - return false; - } - } - } - _cur_backup.end_time_ms = dsn_now_ms(); - hint_message = fmt::format("backup {} completed.", _cur_backup.backup_id); - return true; -} - backup_service::backup_service(meta_service *meta_svc, const std::string &policy_meta_root, const std::string &backup_root, @@ -1849,36 +1570,47 @@ std::string backup_service::get_backup_path(const std::string &policy_name, int6 return ss.str(); } -void backup_service::start_backup_apps(start_backup_apps_rpc rpc) +void backup_service::start_backup_app(start_backup_apps_rpc rpc) { const start_backup_apps_request &request = rpc.request(); start_backup_apps_response &response = rpc.response(); + int32_t app_id = request.app_id; std::shared_ptr engine = std::make_shared(this); - error_code err = engine->init_backups(request.app_ids); + error_code err = engine->init_backup(app_id); if (err != ERR_OK) { response.err = err; - response.hint_message = "all apps of this backup are invalid."; + response.hint_message = fmt::format("Backup failed: invalid app id {}.", app_id); return; } err = engine->set_block_service(request.backup_provider_type); if (err != ERR_OK) { response.err = err; - response.hint_message = "invalid backup_provider_type: " + request.backup_provider_type; + response.hint_message = fmt::format("Backup failed: invalid backup_provider_type {}.", + request.backup_provider_type); return; } + for (const auto &it : _backup_states) { + const auto &tmp_engine = it.second; + if (app_id == tmp_engine->get_backup_app_id() && tmp_engine->is_backing_up()) { + response.err = ERR_INVALID_STATE; + response.hint_message = fmt::format("Backup failed: app {} is backing up now.", app_id); + return; + } + } + err = engine->run(); if (err == ERR_OK) { int64_t backup_id = engine->get_current_backup_id(); _backup_states.emplace(backup_id, std::move(engine)); response.hint_message = - fmt::format("Backup id {} : The app metadata has been successfully backed up and " + fmt::format("Backup id {}: app metadata has been successfully backed up and " "backup request has been sent to replica servers.", backup_id); } else { - response.hint_message = "Metadata backup failed."; + response.hint_message = "Backup failed: write app metadata failed."; } response.err = err; } diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h index 1f3982de03..eb5ef7d9ea 100644 --- a/src/meta/meta_backup_service.h +++ b/src/meta/meta_backup_service.h @@ -27,6 +27,7 @@ #include #include +#include "backup_engine.h" #include "meta_data.h" namespace dsn { @@ -312,51 +313,6 @@ mock_private : dsn::task_tracker _tracker; }; -enum backup_status -{ - UNALIVE = 1, - ALIVE = 2, - COMPLETED = 3, - FAILED = 4 -}; - -class backup_engine -{ -public: - backup_engine(backup_service *service); - ~backup_engine(); - - error_code init_backups(const std::set &app_ids); - error_code set_block_service(const std::string &provider); - error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer); - error_code backup_app_meta(const std::string &app_name, int32_t app_id); - void backup_app_partition(const std::string &app_name, - const gpid &pid, - const std::string &policy_name); - error_code write_backup_info(); - void on_backup_reply(dsn::error_code err, - backup_response &&response, - gpid pid, - const rpc_address &primary); - error_code run(); - bool is_backup_complete(std::string &hint_message); - int64_t get_current_backup_id() { return _cur_backup.backup_id; } - -private: - std::string get_policy_name() { return "fake_policy_" + std::to_string(_cur_backup.backup_id); } - - backup_service *_backup_service; - dist::block_service::block_filesystem *_block_service; - std::string _provider_type; - dsn::task_tracker _tracker; - - // lock _cur_backup and _backup_status. - dsn::zlock _lock; - backup_info _cur_backup; - // app_id --> partition_id, backup_status - std::map> _backup_status; -}; - class backup_service { public: @@ -385,7 +341,7 @@ class backup_service void add_backup_policy(dsn::message_ex* msg); void query_backup_policy(query_backup_policy_rpc rpc); void modify_backup_policy(configuration_modify_backup_policy_rpc rpc); - void start_backup_apps(start_backup_apps_rpc rpc); + void start_backup_app(start_backup_apps_rpc rpc); // compose the absolute path(AP) for policy // input: @@ -426,7 +382,8 @@ class backup_service zlock _lock; std::map> _policy_states; // policy_name -> policy_context - std::unordered_map> _backup_states; + // backup_id -> backup_engine + std::unordered_map> _backup_states; // the root of policy metas, stored on remote_storage(zookeeper) std::string _policy_meta_root; diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index a80b5f3dd6..764f5c89a8 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -490,7 +490,7 @@ void meta_service::register_rpc_handlers() "query_bulk_load_status", &meta_service::on_query_bulk_load_status); register_rpc_handler_with_rpc_holder( - RPC_CM_START_BACKUP_APPS, "start_backup_apps", &meta_service::on_start_backup_apps); + RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app); } int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address) @@ -1125,7 +1125,7 @@ void meta_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) _bulk_load_svc->on_query_bulk_load_status(std::move(rpc)); } -void meta_service::on_start_backup_apps(start_backup_apps_rpc rpc) +void meta_service::on_start_backup_app(start_backup_apps_rpc rpc) { if (!check_status(rpc)) { return; @@ -1135,7 +1135,7 @@ void meta_service::on_start_backup_apps(start_backup_apps_rpc rpc) rpc.response().err = ERR_SERVICE_NOT_ACTIVE; return; } - _backup_handler->start_backup_apps(std::move(rpc)); + _backup_handler->start_backup_app(std::move(rpc)); } } // namespace replication diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index 8458b8745e..f728afacb5 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -167,7 +167,7 @@ class meta_service : public serverlet void on_start_recovery(configuration_recovery_rpc rpc); // backup/restore - void on_start_backup_apps(start_backup_apps_rpc rpc); + void on_start_backup_app(start_backup_apps_rpc rpc); void on_start_restore(dsn::message_ex *req); void on_add_backup_policy(dsn::message_ex *req); void on_query_backup_policy(query_backup_policy_rpc policy_rpc);