From a6b1c6cf37805073ad601984b6b60398bd20a4aa Mon Sep 17 00:00:00 2001 From: Zhang Yifan Date: Tue, 16 Mar 2021 16:38:31 +0800 Subject: [PATCH] feat(one-time backup): part-2 backup app metadata and send rpc to partition primary (#775) --- src/common/replication_common.h | 2 + src/meta/backup_engine.cpp | 168 ++++++++++++++++++--- src/meta/backup_engine.h | 17 ++- src/meta/meta_backup_service.cpp | 8 + src/replica/backup/replica_backup_server.h | 4 +- 5 files changed, 174 insertions(+), 25 deletions(-) diff --git a/src/common/replication_common.h b/src/common/replication_common.h index 877dfa54d5..6f4ada85e8 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -50,6 +50,8 @@ typedef rpc_holder register_chi typedef rpc_holder notify_stop_split_rpc; typedef rpc_holder query_child_state_rpc; +typedef rpc_holder backup_rpc; + class replication_options { public: diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index d0c61101af..fcc052463d 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -18,6 +18,7 @@ #include #include "common/backup_utils.h" +#include "common/replication_common.h" #include "server_state.h" namespace dsn { @@ -30,33 +31,29 @@ backup_engine::backup_engine(backup_service *service) backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); } -error_code backup_engine::get_app_stat(int32_t app_id, std::shared_ptr &app) -{ - zauto_read_lock l; - _backup_service->get_state()->lock_read(l); - app = _backup_service->get_state()->get_app(app_id); - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - derror_f("app {} is not available, couldn't do backup now.", app_id); - return ERR_INVALID_STATE; - } - return ERR_OK; -} - error_code backup_engine::init_backup(int32_t app_id) { - std::shared_ptr app; - error_code err = get_app_stat(app_id, app); - if (err != ERR_OK) { - return err; + std::string app_name; + int partition_count; + { + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + std::shared_ptr app = _backup_service->get_state()->get_app(app_id); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("app {} is not available, couldn't do backup now.", app_id); + return ERR_INVALID_STATE; + } + app_name = app->app_name; + partition_count = app->partition_count; } zauto_lock lock(_lock); _backup_status.clear(); - for (int i = 0; i < app->partition_count; ++i) { + for (int i = 0; i < partition_count; ++i) { _backup_status.emplace(i, backup_status::UNALIVE); } _cur_backup.app_id = app_id; - _cur_backup.app_name = app->app_name; + _cur_backup.app_name = app_name; _cur_backup.backup_id = static_cast(dsn_now_ms()); _cur_backup.start_time_ms = _cur_backup.backup_id; return ERR_OK; @@ -74,9 +71,140 @@ error_code backup_engine::set_block_service(const std::string &provider) return ERR_OK; } -error_code backup_engine::start() { return ERR_OK; } +error_code backup_engine::write_backup_file(const std::string &file_name, + const dsn::blob &write_buffer) +{ + dist::block_service::create_file_request create_file_req; + create_file_req.ignore_metadata = true; + create_file_req.file_name = file_name; + + dsn::error_code err; + dist::block_service::block_file_ptr remote_file; + _block_service + ->create_file(create_file_req, + TASK_CODE_EXEC_INLINED, + [&err, &remote_file](const dist::block_service::create_file_response &resp) { + err = resp.err; + remote_file = resp.file_handle; + }) + ->wait(); + if (err != dsn::ERR_OK) { + ddebug_f("create file {} failed", file_name); + return err; + } + dassert_f(remote_file != nullptr, + "create file {} succeed, but can't get handle", + create_file_req.file_name); + remote_file + ->write(dist::block_service::write_request{write_buffer}, + TASK_CODE_EXEC_INLINED, + [&err](const dist::block_service::write_response &resp) { err = resp.err; }) + ->wait(); + return err; +} + +error_code backup_engine::backup_app_meta() +{ + dsn::blob app_info_buffer; + { + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + std::shared_ptr app = _backup_service->get_state()->get_app(_cur_backup.app_id); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("app {} is not available, couldn't do backup now.", _cur_backup.app_id); + return ERR_INVALID_STATE; + } + app_state tmp = *app; + // Because we don't restore app envs, so no need to write app envs to backup file. + tmp.envs.clear(); + app_info_buffer = dsn::json::json_forwarder::encode(tmp); + } + + std::string file_name = cold_backup::get_app_metadata_file(_backup_service->backup_root(), + _cur_backup.app_name, + _cur_backup.app_id, + _cur_backup.backup_id); + return write_backup_file(file_name, app_info_buffer); +} + +void backup_engine::backup_app_partition(const gpid &pid) +{ + dsn::rpc_address partition_primary; + { + zauto_read_lock l; + _backup_service->get_state()->lock_read(l); + std::shared_ptr app = _backup_service->get_state()->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + derror_f("app {} is not available, couldn't do backup now.", pid.get_app_id()); + + zauto_lock lock(_lock); + is_backup_failed = true; + return; + } + partition_primary = app->partitions[pid.get_partition_index()].primary; + } + + if (partition_primary.is_invalid()) { + dwarn_f("backup_id({}): partition {} doesn't have a primary now, retry to backup it later.", + _cur_backup.backup_id, + pid.to_string()); + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(10)); + return; + } + + auto req = std::make_unique(); + req->pid = pid; + policy_info backup_policy_info; + backup_policy_info.__set_backup_provider_type(_provider_type); + backup_policy_info.__set_policy_name(get_policy_name()); + req->policy = backup_policy_info; + req->backup_id = _cur_backup.backup_id; + req->app_name = _cur_backup.app_name; + + ddebug_f("backup_id({}): send backup request to partition {}, target_addr = {}", + _cur_backup.backup_id, + pid.to_string(), + partition_primary.to_string()); + backup_rpc rpc(std::move(req), RPC_COLD_BACKUP, 10000_ms, 0, pid.thread_hash()); + rpc.call( + partition_primary, &_tracker, [this, rpc, pid, partition_primary](error_code err) mutable { + on_backup_reply(err, rpc.response(), pid, partition_primary); + }); + + zauto_lock l(_lock); + _backup_status[pid.get_partition_index()] = backup_status::ALIVE; +} + +void backup_engine::on_backup_reply(error_code err, + const backup_response &response, + gpid pid, + const rpc_address &primary) +{ +} + +error_code backup_engine::start() +{ + error_code err = backup_app_meta(); + if (err != ERR_OK) { + derror_f("backup_id({}): backup meta data for app {} failed, error {}", + _cur_backup.backup_id, + _cur_backup.app_id, + err.to_string()); + return err; + } + for (int i = 0; i < _backup_status.size(); ++i) { + tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() { + backup_app_partition(gpid(_cur_backup.app_id, i)); + }); + } + return ERR_OK; +} -bool backup_engine::is_backing_up() +bool backup_engine::is_backing_up() const { zauto_lock l(_lock); return _cur_backup.end_time_ms == 0 && !is_backup_failed; diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h index bb81304d46..b1af2a8134 100644 --- a/src/meta/backup_engine.h +++ b/src/meta/backup_engine.h @@ -60,10 +60,21 @@ class backup_engine int64_t get_current_backup_id() const { return _cur_backup.backup_id; } int32_t get_backup_app_id() const { return _cur_backup.app_id; } - bool is_backing_up(); + bool is_backing_up() const; private: - error_code get_app_stat(int32_t app_id, std::shared_ptr &app); + error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer); + error_code backup_app_meta(); + void backup_app_partition(const gpid &pid); + void on_backup_reply(error_code err, + const backup_response &response, + gpid pid, + const rpc_address &primary); + + const std::string get_policy_name() const + { + return "fake_policy_" + std::to_string(_cur_backup.backup_id); + } backup_service *_backup_service; dist::block_service::block_filesystem *_block_service; @@ -71,7 +82,7 @@ class backup_engine dsn::task_tracker _tracker; // lock the following variables. - dsn::zlock _lock; + mutable dsn::zlock _lock; bool is_backup_failed; app_backup_info _cur_backup; // partition_id -> backup_status diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp index 225b56b243..a71964c790 100644 --- a/src/meta/meta_backup_service.cpp +++ b/src/meta/meta_backup_service.cpp @@ -1605,6 +1605,14 @@ void backup_service::start_backup_app(start_backup_app_rpc rpc) if (err == ERR_OK) { int64_t backup_id = engine->get_current_backup_id(); _backup_states.emplace(backup_id, std::move(engine)); + response.hint_message = + fmt::format("Backup id {} : metadata of app {} has been successfully backed up and " + "backup request has been sent to replica servers.", + backup_id, + app_id); + } else { + response.hint_message = + fmt::format("Backup failed: could not backup metadata for app {}.", app_id); } response.err = err; } diff --git a/src/replica/backup/replica_backup_server.h b/src/replica/backup/replica_backup_server.h index 477f7e329d..821a81d82d 100644 --- a/src/replica/backup/replica_backup_server.h +++ b/src/replica/backup/replica_backup_server.h @@ -20,13 +20,13 @@ #include #include +#include "common/replication_common.h" + namespace dsn { namespace replication { class replica_stub; -typedef rpc_holder backup_rpc; - // A server distributes the cold-backup task to the targeted replica. class replica_backup_server {