diff --git a/src/rdsn/include/dsn/cpp/json_helper.h b/src/rdsn/include/dsn/cpp/json_helper.h index 4953661f96..e1c62063db 100644 --- a/src/rdsn/include/dsn/cpp/json_helper.h +++ b/src/rdsn/include/dsn/cpp/json_helper.h @@ -350,6 +350,8 @@ ENUM_TYPE_SERIALIZATION(dsn::replication::partition_status::type, ENUM_TYPE_SERIALIZATION(dsn::app_status::type, dsn::app_status::AS_INVALID) ENUM_TYPE_SERIALIZATION(dsn::replication::bulk_load_status::type, dsn::replication::bulk_load_status::BLS_INVALID) +ENUM_TYPE_SERIALIZATION(dsn::replication::backup_status::type, + dsn::replication::backup_status::UNINITIALIZED) // json serialization for gpid, we treat it as string: "app_id.partition_id" inline void json_encode(JsonWriter &out, const dsn::gpid &pid) @@ -386,6 +388,8 @@ inline void json_encode(JsonWriter &out, const dsn::replication::file_meta &f_me inline bool json_decode(const JsonObject &in, dsn::replication::file_meta &f_meta); inline void json_encode(JsonWriter &out, const dsn::replication::bulk_load_metadata &metadata); inline bool json_decode(const JsonObject &in, dsn::replication::bulk_load_metadata &metadata); +inline void json_encode(JsonWriter &out, const dsn::replication::backup_item &backup_item); +inline bool json_decode(const JsonObject &in, dsn::replication::backup_item &backup_item); template inline void json_encode_iterable(JsonWriter &out, const T &t) @@ -655,5 +659,15 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::app_info, NON_MEMBER_JSON_SERIALIZATION(dsn::replication::file_meta, name, size, md5) NON_MEMBER_JSON_SERIALIZATION(dsn::replication::bulk_load_metadata, files, file_total_size) + +NON_MEMBER_JSON_SERIALIZATION(dsn::replication::backup_item, + backup_id, + app_id, + app_name, + backup_provider_type, + backup_path, + start_time_ms, + end_time_ms, + status) } // namespace json } // namespace dsn diff --git a/src/rdsn/include/dsn/dist/replication/replication_enums.h b/src/rdsn/include/dsn/dist/replication/replication_enums.h index 0b01aed56f..ece83e7b42 100644 --- a/src/rdsn/include/dsn/dist/replication/replication_enums.h +++ b/src/rdsn/include/dsn/dist/replication/replication_enums.h @@ -157,4 +157,16 @@ ENUM_REG(replication::manual_compaction_status::QUEUING) ENUM_REG(replication::manual_compaction_status::RUNNING) ENUM_REG(replication::manual_compaction_status::FINISHED) ENUM_END2(replication::manual_compaction_status::type, manual_compaction_status) + +ENUM_BEGIN2(replication::backup_status::type, + backup_status, + replication::backup_status::UNINITIALIZED) +ENUM_REG(replication::backup_status::UNINITIALIZED) +ENUM_REG(replication::backup_status::CHECKPOINTING) +ENUM_REG(replication::backup_status::CHECKPOINTED) +ENUM_REG(replication::backup_status::UPLOADING) +ENUM_REG(replication::backup_status::SUCCEED) +ENUM_REG(replication::backup_status::FAILED) +ENUM_REG(replication::backup_status::CANCELED) +ENUM_END2(replication::backup_status::type, backup_status) } // namespace dsn diff --git a/src/rdsn/src/common/backup.thrift b/src/rdsn/src/common/backup.thrift index 9941743711..71c36d356a 100644 --- a/src/rdsn/src/common/backup.thrift +++ b/src/rdsn/src/common/backup.thrift @@ -20,6 +20,30 @@ include "../../../../idl/dsn.layer2.thrift" namespace cpp dsn.replication +enum backup_status +{ + UNINITIALIZED, + CHECKPOINTING, + CHECKPOINTED, + UPLOADING, + SUCCEED, + FAILED, + CANCELED +} + +struct backup_item +{ + 1:i64 backup_id; + 2:i32 app_id; + 3:string app_name; + 4:string backup_provider_type; + // user specified backup_path. + 5:string backup_path; + 6:i64 start_time_ms; + 7:i64 end_time_ms; + 8:backup_status status; +} + struct policy_info { 1:string policy_name; @@ -177,18 +201,6 @@ struct start_backup_app_response 3:optional i64 backup_id; } -struct backup_item -{ - 1:i64 backup_id; - 2:string app_name; - 3:string backup_provider_type; - // user specified backup_path. - 4:string backup_path; - 5:i64 start_time_ms; - 6:i64 end_time_ms; - 7:bool is_backup_failed; -} - struct query_backup_status_request { 1:i32 app_id; diff --git a/src/rdsn/src/meta/backup_engine.cpp b/src/rdsn/src/meta/meta_backup_engine.cpp similarity index 72% rename from src/rdsn/src/meta/backup_engine.cpp rename to src/rdsn/src/meta/meta_backup_engine.cpp index 47af195257..76ef3a2540 100644 --- a/src/rdsn/src/meta/backup_engine.cpp +++ b/src/rdsn/src/meta/meta_backup_engine.cpp @@ -16,74 +16,91 @@ // under the License. #include +#include #include -#include "common/backup_common.h" -#include "common/replication_common.h" -#include "server_state.h" +#include "meta_backup_engine.h" namespace dsn { namespace replication { -backup_engine::backup_engine(backup_service *service) - : _backup_service(service), _block_service(nullptr), _backup_path(""), _is_backup_failed(false) +meta_backup_engine::meta_backup_engine(meta_service *meta_svc, bool is_periodic) + : _meta_svc(meta_svc), _is_periodic_backup(is_periodic) { } -backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); } +meta_backup_engine::~meta_backup_engine() { _tracker.cancel_outstanding_tasks(); } -error_code backup_engine::init_backup(int32_t app_id) +// ThreadPool: THREAD_POOL_DEFAULT +void meta_backup_engine::init_backup(int32_t app_id, + int32_t partition_count, + const std::string &app_name, + const std::string &provider, + const std::string &backup_root_path) { - std::string app_name; - int partition_count; - { - zauto_read_lock l; - _backup_service->get_state()->lock_read(l); - std::shared_ptr app = _backup_service->get_state()->get_app(app_id); - if (app == nullptr || app->status != app_status::AS_AVAILABLE) { - derror_f("app {} is not available, couldn't do backup now.", app_id); - return ERR_INVALID_STATE; - } - app_name = app->app_name; - partition_count = app->partition_count; - } - - zauto_lock lock(_lock); + zauto_write_lock l(_lock); _backup_status.clear(); for (int i = 0; i < partition_count; ++i) { - _backup_status.emplace(i, backup_status::UNALIVE); + _backup_status.emplace_back(backup_status::UNINITIALIZED); } _cur_backup.app_id = app_id; _cur_backup.app_name = app_name; _cur_backup.backup_id = static_cast(dsn_now_ms()); _cur_backup.start_time_ms = _cur_backup.backup_id; - return ERR_OK; + _cur_backup.backup_provider_type = provider; + _cur_backup.backup_path = backup_root_path; + _cur_backup.status = backup_status::UNINITIALIZED; + _is_backup_failed = false; + _is_backup_canceled = false; } -error_code backup_engine::set_block_service(const std::string &provider) +// ThreadPool: THREAD_POOL_DEFAULT +void meta_backup_engine::start() { - _provider_type = provider; - _block_service = _backup_service->get_meta_service() - ->get_block_service_manager() - .get_or_create_block_filesystem(provider); - if (_block_service == nullptr) { - return ERR_INVALID_PARAMETERS; + ddebug_f("App[{}] start {} backup[{}] on {}, root_path = {}", + _cur_backup.app_name, + _is_periodic_backup ? "periodic" : "onetime", + _cur_backup.backup_id, + _cur_backup.backup_provider_type, + _cur_backup.backup_path); + error_code err = write_app_info(); + if (err != ERR_OK) { + derror_f("backup_id({}): backup meta data for app {} failed, error {}", + _cur_backup.backup_id, + _cur_backup.app_id, + err); + update_backup_item_on_remote_storage(backup_status::FAILED, dsn_now_ms()); + return; + } + update_backup_item_on_remote_storage(backup_status::CHECKPOINTING); + FAIL_POINT_INJECT_F("meta_backup_engine_start", [](dsn::string_view) {}); + for (auto i = 0; i < _backup_status.size(); ++i) { + zauto_write_lock l(_lock); + _backup_status[i] = backup_status::CHECKPOINTING; + tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() { + backup_app_partition(gpid(_cur_backup.app_id, i)); + }); } - return ERR_OK; } -error_code backup_engine::set_backup_path(const std::string &path) +// ThreadPool: THREAD_POOL_DEFAULT +error_code meta_backup_engine::write_app_info() { - if (_block_service && _block_service->is_root_path_set()) { - return ERR_INVALID_PARAMETERS; - } - ddebug_f("backup path is set to {}.", path); - _backup_path = path; + // TODO(heyuchen): TBD return ERR_OK; } -error_code backup_engine::write_backup_file(const std::string &file_name, - const dsn::blob &write_buffer) +// ThreadPool: THREAD_POOL_DEFAULT +void meta_backup_engine::update_backup_item_on_remote_storage(backup_status::type new_status, + int64_t end_time) +{ + // TODO(heyuchen): TBD +} + +// TODO(heyuchen): update following functions + +error_code meta_backup_engine::write_backup_file(const std::string &file_name, + const dsn::blob &write_buffer) { dist::block_service::create_file_request create_file_req; create_file_req.ignore_metadata = true; @@ -114,7 +131,7 @@ error_code backup_engine::write_backup_file(const std::string &file_name, return err; } -error_code backup_engine::backup_app_meta() +error_code meta_backup_engine::backup_app_meta() { dsn::blob app_info_buffer; { @@ -139,7 +156,7 @@ error_code backup_engine::backup_app_meta() return write_backup_file(file_name, app_info_buffer); } -void backup_engine::backup_app_partition(const gpid &pid) +void meta_backup_engine::backup_app_partition(const gpid &pid) { dsn::rpc_address partition_primary; { @@ -149,7 +166,7 @@ void backup_engine::backup_app_partition(const gpid &pid) if (app == nullptr || app->status != app_status::AS_AVAILABLE) { derror_f("app {} is not available, couldn't do backup now.", pid.get_app_id()); - zauto_lock lock(_lock); + zauto_write_lock lock(_lock); _is_backup_failed = true; return; } @@ -190,12 +207,12 @@ void backup_engine::backup_app_partition(const gpid &pid) on_backup_reply(err, rpc.response(), pid, partition_primary); }); - zauto_lock l(_lock); - _backup_status[pid.get_partition_index()] = backup_status::ALIVE; + zauto_write_lock l(_lock); + _backup_status[pid.get_partition_index()] = backup_status::CHECKPOINTING; } -inline void backup_engine::handle_replica_backup_failed(const backup_response &response, - const gpid pid) +inline void meta_backup_engine::handle_replica_backup_failed(const backup_response &response, + const gpid pid) { dcheck_eq(response.pid, pid); dcheck_eq(response.backup_id, _cur_backup.backup_id); @@ -204,13 +221,13 @@ inline void backup_engine::handle_replica_backup_failed(const backup_response &r _cur_backup.backup_id, pid.to_string(), response.err.to_string()); - zauto_lock l(_lock); + zauto_write_lock l(_lock); // if one partition fail, the whole backup plan fail. _is_backup_failed = true; _backup_status[pid.get_partition_index()] = backup_status::FAILED; } -inline void backup_engine::retry_backup(const dsn::gpid pid) +inline void meta_backup_engine::retry_backup(const dsn::gpid pid) { tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, @@ -219,13 +236,13 @@ inline void backup_engine::retry_backup(const dsn::gpid pid) std::chrono::seconds(1)); } -void backup_engine::on_backup_reply(const error_code err, - const backup_response &response, - const gpid pid, - const rpc_address &primary) +void meta_backup_engine::on_backup_reply(const error_code err, + const backup_response &response, + const gpid pid, + const rpc_address &primary) { { - zauto_lock l(_lock); + zauto_read_lock l(_lock); // if backup of some partition failed, we would not handle response from other partitions. if (_is_backup_failed) { return; @@ -262,8 +279,8 @@ void backup_engine::on_backup_reply(const error_code err, _cur_backup.backup_id, pid.to_string()); { - zauto_lock l(_lock); - _backup_status[pid.get_partition_index()] = backup_status::COMPLETED; + zauto_write_lock l(_lock); + _backup_status[pid.get_partition_index()] = backup_status::SUCCEED; } complete_current_backup(); return; @@ -280,19 +297,19 @@ void backup_engine::on_backup_reply(const error_code err, retry_backup(pid); } -void backup_engine::write_backup_info() +void meta_backup_engine::write_backup_info() { std::string backup_root = dsn::utils::filesystem::path_combine(_backup_path, _backup_service->backup_root()); std::string file_name = cold_backup::get_backup_info_file(backup_root, _cur_backup.backup_id); - blob buf = dsn::json::json_forwarder::encode(_cur_backup); + blob buf = dsn::json::json_forwarder::encode(_cur_backup); error_code err = write_backup_file(file_name, buf); if (err == ERR_FS_INTERNAL) { derror_f( "backup_id({}): write backup info failed, error {}, do not try again for this error.", _cur_backup.backup_id, err.to_string()); - zauto_lock l(_lock); + zauto_write_lock l(_lock); _is_backup_failed = true; return; } @@ -308,16 +325,16 @@ void backup_engine::write_backup_info() ddebug_f("backup_id({}): successfully wrote backup info, backup for app {} completed.", _cur_backup.backup_id, _cur_backup.app_id); - zauto_lock l(_lock); + zauto_write_lock l(_lock); _cur_backup.end_time_ms = dsn_now_ms(); } -void backup_engine::complete_current_backup() +void meta_backup_engine::complete_current_backup() { { - zauto_lock l(_lock); + zauto_read_lock l(_lock); for (const auto &status : _backup_status) { - if (status.second != backup_status::COMPLETED) { + if (status != backup_status::SUCCEED) { // backup for some partition was not finished. return; } @@ -327,43 +344,5 @@ void backup_engine::complete_current_backup() write_backup_info(); } -error_code backup_engine::start() -{ - error_code err = backup_app_meta(); - if (err != ERR_OK) { - derror_f("backup_id({}): backup meta data for app {} failed, error {}", - _cur_backup.backup_id, - _cur_backup.app_id, - err.to_string()); - return err; - } - for (int i = 0; i < _backup_status.size(); ++i) { - tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() { - backup_app_partition(gpid(_cur_backup.app_id, i)); - }); - } - return ERR_OK; -} - -bool backup_engine::is_in_progress() const -{ - zauto_lock l(_lock); - return _cur_backup.end_time_ms == 0 && !_is_backup_failed; -} - -backup_item backup_engine::get_backup_item() const -{ - zauto_lock l(_lock); - backup_item item; - item.backup_id = _cur_backup.backup_id; - item.app_name = _cur_backup.app_name; - item.backup_path = _backup_path; - item.backup_provider_type = _provider_type; - item.start_time_ms = _cur_backup.start_time_ms; - item.end_time_ms = _cur_backup.end_time_ms; - item.is_backup_failed = _is_backup_failed; - return item; -} - } // namespace replication } // namespace dsn diff --git a/src/rdsn/src/meta/backup_engine.h b/src/rdsn/src/meta/meta_backup_engine.h similarity index 55% rename from src/rdsn/src/meta/backup_engine.h rename to src/rdsn/src/meta/meta_backup_engine.h index 75a84b7935..c389979c86 100644 --- a/src/rdsn/src/meta/backup_engine.h +++ b/src/rdsn/src/meta/meta_backup_engine.h @@ -18,74 +18,105 @@ #pragma once #include -#include #include +#include "common/backup_common.h" +#include "meta_service.h" +#include "server_state.h" +#include "meta_backup_service.h" + namespace dsn { namespace replication { -enum backup_status -{ - UNALIVE = 1, - ALIVE = 2, - COMPLETED = 3, - FAILED = 4 -}; - +// backup_info file written into block service struct app_backup_info { int64_t backup_id; int64_t start_time_ms; int64_t end_time_ms; - int32_t app_id; std::string app_name; - app_backup_info() : backup_id(0), start_time_ms(0), end_time_ms(0) {} - DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id, app_name) }; -class app_state; -class backup_service; - -class backup_engine +/// +/// Meta backup status +/// +/// start backup +/// | +/// v Error/Cancel +/// Checkpointing ------------->| +/// | | +/// v Error/Cancel | +/// Uploading -------------->| +/// | | +/// v v +/// Succeed Failed/Canceled +/// +class meta_backup_engine { public: - backup_engine(backup_service *service); - ~backup_engine(); - - error_code init_backup(int32_t app_id); - error_code set_block_service(const std::string &provider); - error_code set_backup_path(const std::string &path); - - error_code start(); + explicit meta_backup_engine(meta_service *meta_svc, bool is_periodic); + ~meta_backup_engine(); int64_t get_current_backup_id() const { return _cur_backup.backup_id; } int32_t get_backup_app_id() const { return _cur_backup.app_id; } - bool is_in_progress() const; - backup_item get_backup_item() const; + backup_item get_backup_item() const + { + zauto_read_lock l(_lock); + backup_item item = _cur_backup; + return item; + } -private: - friend class backup_engine_test; - friend class backup_service_test; + bool is_in_progress() const + { + zauto_read_lock l(_lock); + return _cur_backup.end_time_ms == 0 && !_is_backup_failed && !_is_backup_canceled; + } - FRIEND_TEST(backup_engine_test, test_on_backup_reply); - FRIEND_TEST(backup_engine_test, test_backup_completed); - FRIEND_TEST(backup_engine_test, test_write_backup_info_failed); +private: + void init_backup(int32_t app_id, + int32_t partition_count, + const std::string &app_name, + const std::string &provider, + const std::string &backup_root_path); + void start(); - error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer); - error_code backup_app_meta(); void backup_app_partition(const gpid &pid); void on_backup_reply(error_code err, const backup_response &response, gpid pid, const rpc_address &primary); + void retry_backup(const dsn::gpid pid); + void handle_replica_backup_failed(const backup_response &response, const gpid pid); + + error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer); + error_code write_app_info(); void write_backup_info(); + + void update_backup_item_on_remote_storage(backup_status::type new_status, int64_t end_time = 0); + +private: + friend class meta_backup_engine_test; + + meta_service *_meta_svc; + task_tracker _tracker; + + mutable zrwlock_nr _lock; // { + bool _is_periodic_backup; + bool _is_backup_failed{false}; + bool _is_backup_canceled{false}; + backup_item _cur_backup; + std::vector _backup_status; + // } + + // TODO(heyuchen): remove following functions and vars +private: + error_code backup_app_meta(); + void complete_current_backup(); - void handle_replica_backup_failed(const backup_response &response, const gpid pid); - void retry_backup(const dsn::gpid pid); const std::string get_policy_name() const { @@ -96,14 +127,6 @@ class backup_engine dist::block_service::block_filesystem *_block_service; std::string _backup_path; std::string _provider_type; - dsn::task_tracker _tracker; - - // lock the following variables. - mutable dsn::zlock _lock; - bool _is_backup_failed; - app_backup_info _cur_backup; - // partition_id -> backup_status - std::map _backup_status; }; } // namespace replication diff --git a/src/rdsn/src/meta/meta_backup_service.h b/src/rdsn/src/meta/meta_backup_service.h index a118cd35d7..83bdefcb2e 100644 --- a/src/rdsn/src/meta/meta_backup_service.h +++ b/src/rdsn/src/meta/meta_backup_service.h @@ -27,7 +27,7 @@ #include #include -#include "backup_engine.h" +#include "meta_backup_engine.h" #include "meta_data.h" #include "meta_rpc_types.h" diff --git a/src/rdsn/src/meta/meta_service.cpp b/src/rdsn/src/meta/meta_service.cpp index e5f5f2a54f..b9a0af034a 100644 --- a/src/rdsn/src/meta/meta_service.cpp +++ b/src/rdsn/src/meta/meta_service.cpp @@ -48,6 +48,7 @@ #include "meta_split_service.h" #include "meta_bulk_load_service.h" #include "runtime/security/access_controller.h" +#include "meta_backup_service.h" namespace dsn { namespace replication { diff --git a/src/rdsn/src/meta/meta_service.h b/src/rdsn/src/meta/meta_service.h index d43be2dc0e..3161a0ccb2 100644 --- a/src/rdsn/src/meta/meta_service.h +++ b/src/rdsn/src/meta/meta_service.h @@ -47,7 +47,6 @@ #include "common/manual_compact.h" #include "meta_rpc_types.h" #include "meta_options.h" -#include "meta_backup_service.h" #include "meta_state_service_utils.h" #include "block_service/block_service_manager.h" #include "partition_guardian.h" @@ -64,6 +63,7 @@ class server_load_balancer; class meta_duplication_service; class meta_split_service; class bulk_load_service; +class backup_service; namespace test { class test_checker; } diff --git a/src/rdsn/src/meta/test/server_state_restore_test.cpp b/src/rdsn/src/meta/test/server_state_restore_test.cpp index 7f164d20ed..495a627990 100644 --- a/src/rdsn/src/meta/test/server_state_restore_test.cpp +++ b/src/rdsn/src/meta/test/server_state_restore_test.cpp @@ -23,6 +23,7 @@ #include "meta/meta_service.h" #include "meta/server_state.h" #include "meta_test_base.h" +#include "meta/meta_backup_service.h" namespace dsn { namespace replication {