From 1cfd2efc90c29b3579302b32be3547e521b8a9af Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Thu, 16 Dec 2021 12:20:45 +0800 Subject: [PATCH 01/10] fix(one-time backup): fix unit test feat: restrict the replication factor while creating app (#963) feat(manual_compaction): meta server support querying compaction status (#987) manual test --- .../dsn/dist/replication/replication.codes.h | 1 + .../dist/replication/replication_ddl_client.h | 3 + src/client/replication_ddl_client.cpp | 9 + src/common/meta_admin.thrift | 16 ++ src/common/replication_common.h | 2 + src/meta/backup_engine.cpp | 47 +++-- src/meta/meta_backup_service.h | 2 + src/meta/meta_data.cpp | 39 ++++ src/meta/meta_data.h | 5 + src/meta/meta_service.cpp | 19 ++ src/meta/meta_service.h | 5 + src/meta/server_state.cpp | 116 ++++++++++- src/meta/server_state.h | 6 + src/meta/test/meta_app_operation_test.cpp | 188 ++++++++++++++++-- src/meta/test/meta_backup_test.cpp | 52 +++-- .../test/meta_duplication_service_test.cpp | 2 +- src/meta/test/meta_mauanl_compaction_test.cpp | 108 ++++++++++ src/meta/test/meta_test_base.cpp | 75 +++++++ src/meta/test/meta_test_base.h | 9 + src/replica/replica_backup.cpp | 2 +- 20 files changed, 651 insertions(+), 55 deletions(-) create mode 100644 src/meta/test/meta_mauanl_compaction_test.cpp diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index d8a0ce814e..be7bdab457 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -122,6 +122,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BACKUP_STATUS, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL #define CURRENT_THREAD_POOL THREAD_POOL_META_STATE diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index 7542ef92a5..8bac4c1ecc 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -218,6 +218,9 @@ class replication_ddl_client error_with add_new_disk(const rpc_address &target_node, const std::string &disk_str); + error_with + query_app_manual_compact(const std::string &app_name); + private: bool static valid_app_char(int c); diff --git a/src/client/replication_ddl_client.cpp b/src/client/replication_ddl_client.cpp index 72231201a2..9d2a693d52 100644 --- a/src/client/replication_ddl_client.cpp +++ b/src/client/replication_ddl_client.cpp @@ -1684,5 +1684,14 @@ replication_ddl_client::add_new_disk(const rpc_address &target_node, const std:: return resps.begin()->second.get_value(); } +error_with +replication_ddl_client::query_app_manual_compact(const std::string &app_name) +{ + auto req = make_unique(); + req->app_name = app_name; + return call_rpc_sync( + query_manual_compact_rpc(std::move(req), RPC_CM_QUERY_MANUAL_COMPACT_STATUS)); +} + } // namespace replication } // namespace dsn diff --git a/src/common/meta_admin.thrift b/src/common/meta_admin.thrift index 100b4fd984..d8b3634fcc 100644 --- a/src/common/meta_admin.thrift +++ b/src/common/meta_admin.thrift @@ -237,6 +237,22 @@ struct configuration_update_app_env_response 2:string hint_message; } +struct query_app_manual_compact_request +{ + 1:string app_name; +} + +struct query_app_manual_compact_response +{ + // Possible error: + // - ERR_APP_NOT_EXIST: app not exist + // - ERR_APP_DROPPED: app has been dropped + // - ERR_INVALID_STATE: app is not executing manual compaction + 1:dsn.error_code err; + 2:string hint_msg; + 3:optional i32 progress; +} + /////////////////// Nodes Management //////////////////// struct node_info diff --git a/src/common/replication_common.h b/src/common/replication_common.h index f98f0031dc..53bb8ed671 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -51,6 +51,8 @@ typedef rpc_holder notify typedef rpc_holder query_child_state_rpc; typedef rpc_holder backup_rpc; +typedef rpc_holder + query_manual_compact_rpc; class replication_options { diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index b2dbbb80d6..f57f2be574 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -199,6 +199,21 @@ void backup_engine::on_backup_reply(error_code err, gpid pid, const rpc_address &primary) { + // we should check err before checking response.pid + if (err != ERR_OK) { + dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to " + "send backup request.", + _cur_backup.backup_id, + primary.to_string(), + err.to_string()); + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(1)); + return; + }; + dcheck_eq(response.pid, pid); dcheck_eq(response.backup_id, _cur_backup.backup_id); @@ -216,8 +231,7 @@ void backup_engine::on_backup_reply(error_code err, // backup not completed in other cases. // see replica::on_cold_backup() for details. int32_t partition = pid.get_partition_index(); - if (err == dsn::ERR_OK && response.err == dsn::ERR_OK && - response.progress == cold_backup_constant::PROGRESS_FINISHED) { + if (response.err == ERR_OK && response.progress == cold_backup_constant::PROGRESS_FINISHED) { ddebug_f("backup_id({}): backup for partition {} completed.", _cur_backup.backup_id, pid.to_string()); @@ -230,31 +244,24 @@ void backup_engine::on_backup_reply(error_code err, } if (response.err == ERR_LOCAL_APP_FAILURE) { - derror_f("backup_id({}): backup for partition {} failed.", + derror_f("backup_id({}): backup for partition {} failed, error message: {}", _cur_backup.backup_id, - pid.to_string()); + pid.to_string(), + response.err.to_string()); zauto_lock l(_lock); _is_backup_failed = true; _backup_status[partition] = backup_status::FAILED; return; } + // default function is retry + ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " + "{}, response error {}, retry to send backup request.", + _cur_backup.backup_id, + pid.to_string(), + primary.to_string(), + err.to_string(), + response.err.to_string()); - if (err != ERR_OK) { - dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string()); - } else { - ddebug_f( - "backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}, retry to send backup request.", - _cur_backup.backup_id, - pid.to_string(), - primary.to_string(), - err.to_string(), - response.err.to_string()); - } tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, pid]() { backup_app_partition(pid); }, diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h index e50b099ab6..c3f9553712 100644 --- a/src/meta/meta_backup_service.h +++ b/src/meta/meta_backup_service.h @@ -380,6 +380,8 @@ class backup_service zlock _lock; std::map> _policy_states; // policy_name -> policy_context + + // _backup_states store all states of one-time backup in the cluster, not persistence to ZK std::vector> _backup_states; // the root of policy metas, stored on remote_storage(zookeeper) diff --git a/src/meta/meta_data.cpp b/src/meta/meta_data.cpp index 5791bbb369..c91ca3f22b 100644 --- a/src/meta/meta_data.cpp +++ b/src/meta/meta_data.cpp @@ -33,7 +33,10 @@ * xxxx-xx-xx, author, fix bug about xxx */ #include + +#include #include + #include "meta_data.h" namespace dsn { @@ -479,6 +482,42 @@ void app_state_helper::on_init_partitions() restore_states.resize(owner->partition_count); } +void app_state_helper::reset_manual_compact_status() +{ + for (auto &cc : contexts) { + for (auto &r : cc.serving) { + r.compact_status = manual_compaction_status::IDLE; + } + } +} + +bool app_state_helper::get_manual_compact_progress(/*out*/ int32_t &progress) const +{ + int32_t total_replica_count = owner->partition_count * owner->max_replica_count; + dassert_f(total_replica_count > 0, + "invalid app metadata, app({}), partition_count({}), max_replica_count({})", + owner->app_name, + owner->partition_count, + owner->max_replica_count); + int32_t finish_count = 0, idle_count = 0; + for (const auto &cc : contexts) { + for (const auto &r : cc.serving) { + if (r.compact_status == manual_compaction_status::IDLE) { + idle_count++; + } else if (r.compact_status == manual_compaction_status::FINISHED) { + finish_count++; + } + } + } + // all replicas of all partitions are idle + if (idle_count == total_replica_count) { + progress = 0; + return false; + } + progress = finish_count * 100 / total_replica_count; + return true; +} + app_state::app_state(const app_info &info) : app_info(info), helpers(new app_state_helper()) { log_name = info.app_name + "(" + boost::lexical_cast(info.app_id) + ")"; diff --git a/src/meta/meta_data.h b/src/meta/meta_data.h index 14a7be8a8a..d74143a37c 100644 --- a/src/meta/meta_data.h +++ b/src/meta/meta_data.h @@ -324,6 +324,11 @@ class app_state_helper cc.lb_actions.clear(); } } + + void reset_manual_compact_status(); + // get replica group manual compact progress + // return false if partition is not executing manual compaction + bool get_manual_compact_progress(/*out*/ int32_t &progress) const; }; /* diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index 45b96c7c9c..e527c27740 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -540,6 +540,9 @@ void meta_service::register_rpc_handlers() RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app); register_rpc_handler_with_rpc_holder( RPC_CM_QUERY_BACKUP_STATUS, "query_backup_status", &meta_service::on_query_backup_status); + register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, + "query_manual_compact_status", + &meta_service::on_query_manual_compact_status); } int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address) @@ -1213,5 +1216,21 @@ void meta_service::on_query_backup_status(query_backup_status_rpc rpc) _backup_handler->query_backup_status(std::move(rpc)); } +size_t meta_service::get_alive_node_count() const +{ + zauto_lock l(_failure_detector->_lock); + return _alive_set.size(); +} + +void meta_service::on_query_manual_compact_status(query_manual_compact_rpc rpc) +{ + if (!check_status(rpc)) { + return; + } + tasking::enqueue(LPC_META_STATE_NORMAL, + nullptr, + std::bind(&server_state::on_query_manual_compact_status, _state.get(), rpc)); +} + } // namespace replication } // namespace dsn diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index d84241b5d1..48671c7359 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -156,6 +156,8 @@ class meta_service : public serverlet dsn::task_tracker *tracker() { return &_tracker; } + size_t get_alive_node_count() const; + bool try_lock_meta_op_status(meta_op_status op_status); void unlock_meta_op_status(); meta_op_status get_op_status() const { return _meta_op_status.load(); } @@ -227,6 +229,9 @@ class meta_service : public serverlet void on_control_bulk_load(control_bulk_load_rpc rpc); void on_query_bulk_load_status(query_bulk_load_rpc rpc); + // manual compaction + void on_query_manual_compact_status(query_manual_compact_rpc rpc); + // common routines // ret: // 1. the meta is leader diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index de15d406e6..110d40858e 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -56,6 +56,35 @@ using namespace dsn; namespace dsn { namespace replication { +DSN_DEFINE_int32("meta_server", + max_allowed_replica_count, + 5, + "max replica count allowed for any app of a cluster"); +DSN_TAG_VARIABLE(max_allowed_replica_count, FT_MUTABLE); +DSN_DEFINE_validator(max_allowed_replica_count, [](int32_t allowed_replica_count) -> bool { + return allowed_replica_count > 0; +}); + +DSN_DEFINE_int32("meta_server", + min_allowed_replica_count, + 1, + "min replica count allowed for any app of a cluster"); +DSN_TAG_VARIABLE(min_allowed_replica_count, FT_MUTABLE); +DSN_DEFINE_validator(min_allowed_replica_count, [](int32_t allowed_replica_count) -> bool { + return allowed_replica_count > 0; +}); + +DSN_DEFINE_group_validator(min_max_allowed_replica_count, [](std::string &message) -> bool { + if (FLAGS_min_allowed_replica_count > FLAGS_max_allowed_replica_count) { + message = fmt::format("meta_server.min_allowed_replica_count({}) should be <= " + "meta_server.max_allowed_replica_count({})", + FLAGS_min_allowed_replica_count, + FLAGS_max_allowed_replica_count); + return false; + } + return true; +}); + static const char *lock_state = "lock"; static const char *unlock_state = "unlock"; @@ -1072,7 +1101,13 @@ void server_state::create_app(dsn::message_ex *msg) opt.replica_count == exist_app.max_replica_count; }; - if (request.options.partition_count <= 0 || request.options.replica_count <= 0) { + auto level = _meta_svc->get_function_level(); + if (level <= meta_function_level::fl_freezed) { + derror_f("current meta function level is freezed since there are too few alive nodes"); + response.err = ERR_STATE_FREEZED; + will_create_app = false; + } else if (request.options.partition_count <= 0 || + !validate_target_max_replica_count(request.options.replica_count)) { response.err = ERR_INVALID_PARAMETERS; will_create_app = false; } else { @@ -2864,5 +2899,84 @@ void server_state::clear_app_envs(const app_env_rpc &env_rpc) new_envs.c_str()); }); } + +namespace { + +bool validate_target_max_replica_count_internal(int32_t max_replica_count, + int32_t alive_node_count, + std::string &hint_message) +{ + if (max_replica_count > FLAGS_max_allowed_replica_count || + max_replica_count < FLAGS_min_allowed_replica_count) { + hint_message = fmt::format("requested replica count({}) must be " + "within the range of [min={}, max={}]", + max_replica_count, + FLAGS_min_allowed_replica_count, + FLAGS_max_allowed_replica_count); + return false; + } + + if (max_replica_count > alive_node_count) { + hint_message = fmt::format("there are not enough alive replica servers({}) " + "for the requested replica count({})", + alive_node_count, + max_replica_count); + return false; + } + + return true; +} + +} // anonymous namespace + +bool server_state::validate_target_max_replica_count(int32_t max_replica_count) +{ + auto alive_node_count = static_cast(_meta_svc->get_alive_node_count()); + + std::string hint_message; + bool valid = validate_target_max_replica_count_internal( + max_replica_count, alive_node_count, hint_message); + if (!valid) { + derror_f("target max replica count is invalid: message={}", hint_message); + } + + return valid; +} + +void server_state::on_query_manual_compact_status(query_manual_compact_rpc rpc) +{ + const std::string &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + std::shared_ptr app; + { + zauto_read_lock l(_lock); + app = get_app(app_name); + } + + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; + response.hint_msg = + fmt::format("app {} is {}", + app_name, + response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available"); + derror_f("{}", response.hint_msg); + return; + } + + int32_t total_progress = 0; + if (!app->helpers->get_manual_compact_progress(total_progress)) { + response.err = ERR_INVALID_STATE; + response.hint_msg = fmt::format("app {} is not manual compaction", app_name); + dwarn_f("{}", response.hint_msg); + return; + } + + ddebug_f("query app {} manual compact succeed, total_progress = {}", app_name, total_progress); + response.err = ERR_OK; + response.hint_msg = "succeed"; + response.__set_progress(total_progress); +} + } // namespace replication } // namespace dsn diff --git a/src/meta/server_state.h b/src/meta/server_state.h index ac97167f3e..a7c060f4cb 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -171,6 +171,9 @@ class server_state void on_query_restore_status(configuration_query_restore_rpc rpc); + // manual compaction + void on_query_manual_compact_status(query_manual_compact_rpc rpc); + // return true if no need to do any actions bool check_all_partitions(); void get_cluster_balance_score(double &primary_stddev /*out*/, double &total_stddev /*out*/); @@ -292,6 +295,9 @@ class server_state void process_one_partition(std::shared_ptr &app); void transition_staging_state(std::shared_ptr &app); + // check whether a max replica count is valid especially for a new app + bool validate_target_max_replica_count(int32_t max_replica_count); + private: friend class bulk_load_service; friend class bulk_load_service_test; diff --git a/src/meta/test/meta_app_operation_test.cpp b/src/meta/test/meta_app_operation_test.cpp index 14e8aba81d..2058bc6f30 100644 --- a/src/meta/test/meta_app_operation_test.cpp +++ b/src/meta/test/meta_app_operation_test.cpp @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include "meta_service_test_app.h" @@ -24,17 +25,23 @@ namespace dsn { namespace replication { + +DSN_DECLARE_int32(min_allowed_replica_count); +DSN_DECLARE_int32(max_allowed_replica_count); + class meta_app_operation_test : public meta_test_base { public: meta_app_operation_test() {} - error_code - create_app_test(int32_t partition_count, int32_t replica_count, bool success_if_exist) + error_code create_app_test(int32_t partition_count, + int32_t replica_count, + bool success_if_exist, + const std::string &app_name) { configuration_create_app_request create_request; configuration_create_app_response create_response; - create_request.app_name = APP_NAME; + create_request.app_name = app_name; create_request.options.app_type = "simple_kv"; create_request.options.partition_count = partition_count; create_request.options.replica_count = replica_count; @@ -94,18 +101,46 @@ class meta_app_operation_test : public meta_test_base app->expire_second -= 604800; } + void clear_nodes() { _ss->_nodes.clear(); } + const std::string APP_NAME = "app_operation_test"; const std::string OLD_APP_NAME = "old_app_operation"; - const int32_t PARTITION_COUNT = 4; - const int32_t REPLICA_COUNT = 3; }; TEST_F(meta_app_operation_test, create_app) { - // Test cases: + // Test cases: (assert min_allowed_replica_count <= max_allowed_replica_count) + // - wrong partition_count (< 0) + // - wrong partition_count (= 0) + // - wrong replica_count (< 0) + // - wrong replica_count (= 0) + // - wrong replica_count (> max_allowed_replica_count > alive_node_count) + // - wrong replica_count (> alive_node_count > max_allowed_replica_count) + // - wrong replica_count (> alive_node_count = max_allowed_replica_count) + // - wrong replica_count (= max_allowed_replica_count, and > alive_node_count) + // - wrong replica_count (< max_allowed_replica_count, and > alive_node_count) + // - wrong replica_count (= alive_node_count, and > max_allowed_replica_count) + // - wrong replica_count (< alive_node_count, and > max_allowed_replica_count) + // - valid replica_count (= max_allowed_replica_count, and = alive_node_count) + // - valid replica_count (= max_allowed_replica_count, and < alive_node_count) + // - valid replica_count (< max_allowed_replica_count, and = alive_node_count) + // - valid replica_count (< max_allowed_replica_count < alive_node_count) + // - valid replica_count (< alive_node_count < max_allowed_replica_count) + // - valid replica_count (< alive_node_count = max_allowed_replica_count) + // - wrong replica_count (< min_allowed_replica_count < alive_node_count) + // - wrong replica_count (< alive_node_count < min_allowed_replica_count) + // - wrong replica_count (< min_allowed_replica_count = alive_node_count) + // - wrong replica_count (< min_allowed_replica_count, and > alive_node_count) + // - wrong replica_count (< min_allowed_replica_count, and = alive_node_count) + // - wrong replica_count (= min_allowed_replica_count, and > alive_node_count) + // - valid replica_count (= min_allowed_replica_count, and < alive_node_count) + // - cluster freezed (alive_node_count = 0) + // - cluster freezed (alive_node_count = 1 < min_live_node_count_for_unfreeze) + // - cluster freezed (alive_node_count = 2 < min_live_node_count_for_unfreeze) + // - cluster not freezed (alive_node_count = min_live_node_count_for_unfreeze) + // - create succeed with single-replica + // - create succeed with double-replica // - create app succeed - // - wrong partition_count - // - wrong replica_count // - create failed with table existed // - wrong app_status creating // - wrong app_status recalling @@ -114,32 +149,147 @@ TEST_F(meta_app_operation_test, create_app) // - create succeed with success_if_exist=true struct create_test { + std::string app_name; int32_t partition_count; int32_t replica_count; + uint64_t min_live_node_count_for_unfreeze; + int alive_node_count; + int32_t min_allowed_replica_count; bool success_if_exist; app_status::type before_status; error_code expected_err; - } tests[] = { - {PARTITION_COUNT, REPLICA_COUNT, false, app_status::AS_INVALID, ERR_OK}, - {0, REPLICA_COUNT, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, - {PARTITION_COUNT, 0, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, - {PARTITION_COUNT, REPLICA_COUNT, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, - {PARTITION_COUNT, REPLICA_COUNT, false, app_status::AS_CREATING, ERR_BUSY_CREATING}, - {PARTITION_COUNT, REPLICA_COUNT, false, app_status::AS_RECALLING, ERR_BUSY_CREATING}, - {PARTITION_COUNT, REPLICA_COUNT, false, app_status::AS_DROPPING, ERR_BUSY_DROPPING}, - {PARTITION_COUNT, REPLICA_COUNT, false, app_status::AS_DROPPED, ERR_OK}, - {PARTITION_COUNT, REPLICA_COUNT, true, app_status::AS_INVALID, ERR_OK}}; + } tests[] = {{APP_NAME, -1, 3, 2, 3, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 0, 3, 2, 3, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, -1, 1, 3, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 0, 1, 3, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 6, 2, 4, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 7, 2, 6, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 6, 2, 5, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 5, 2, 4, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 4, 2, 3, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 6, 2, 6, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 6, 2, 7, 1, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME + "_1", 4, 5, 2, 5, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_2", 4, 5, 2, 6, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_3", 4, 4, 2, 4, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_4", 4, 4, 2, 6, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_5", 4, 3, 2, 4, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_6", 4, 4, 2, 5, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME, 4, 3, 2, 5, 4, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 3, 2, 4, 5, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 3, 2, 4, 4, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 3, 2, 2, 4, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 3, 2, 3, 4, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 4, 2, 3, 4, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME + "_7", 4, 3, 2, 4, 3, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME, 4, 1, 1, 0, 1, false, app_status::AS_INVALID, ERR_STATE_FREEZED}, + {APP_NAME, 4, 2, 2, 1, 1, false, app_status::AS_INVALID, ERR_STATE_FREEZED}, + {APP_NAME, 4, 3, 3, 2, 1, false, app_status::AS_INVALID, ERR_STATE_FREEZED}, + {APP_NAME + "_8", 4, 3, 3, 3, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_9", 4, 1, 1, 1, 1, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME + "_10", 4, 2, 1, 2, 2, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME, 4, 3, 2, 3, 3, false, app_status::AS_INVALID, ERR_OK}, + {APP_NAME, 4, 3, 2, 3, 3, false, app_status::AS_INVALID, ERR_INVALID_PARAMETERS}, + {APP_NAME, 4, 3, 2, 3, 3, false, app_status::AS_CREATING, ERR_BUSY_CREATING}, + {APP_NAME, 4, 3, 2, 3, 3, false, app_status::AS_RECALLING, ERR_BUSY_CREATING}, + {APP_NAME, 4, 3, 2, 3, 3, false, app_status::AS_DROPPING, ERR_BUSY_DROPPING}, + {APP_NAME, 4, 3, 2, 3, 3, false, app_status::AS_DROPPED, ERR_OK}, + {APP_NAME, 4, 3, 2, 3, 3, true, app_status::AS_INVALID, ERR_OK}}; + + clear_nodes(); + + // keep the number of all nodes greater than that of alive nodes + const int total_node_count = 10; + std::vector nodes = ensure_enough_alive_nodes(total_node_count); + + // the meta function level will become freezed once + // alive_nodes * 100 < total_nodes * node_live_percentage_threshold_for_update + // even if alive_nodes >= min_live_node_count_for_unfreeze + set_node_live_percentage_threshold_for_update(0); + + // save original FLAGS_max_allowed_replica_count + auto reserved_max_allowed_replica_count = FLAGS_max_allowed_replica_count; + + // keep FLAGS_max_allowed_replica_count fixed in the tests + auto res = update_flag("max_allowed_replica_count", "5"); + ASSERT_TRUE(res.is_ok()); + + // save original FLAGS_min_allowed_replica_count + auto reserved_min_allowed_replica_count = FLAGS_min_allowed_replica_count; for (auto test : tests) { + res = update_flag("min_allowed_replica_count", + std::to_string(test.min_allowed_replica_count)); + ASSERT_TRUE(res.is_ok()); + + set_min_live_node_count_for_unfreeze(test.min_live_node_count_for_unfreeze); + + dassert_f(total_node_count >= test.alive_node_count, + "total_node_count({}) should be >= alive_node_count({})", + total_node_count, + test.alive_node_count); + for (int i = 0; i < total_node_count - test.alive_node_count; i++) { + _ms->set_node_state({nodes[i]}, false); + } + if (test.before_status == app_status::AS_DROPPED) { update_app_status(app_status::AS_AVAILABLE); drop_app(APP_NAME); } else if (test.before_status != app_status::AS_INVALID) { update_app_status(test.before_status); } - auto err = create_app_test(test.partition_count, test.replica_count, test.success_if_exist); + auto err = create_app_test( + test.partition_count, test.replica_count, test.success_if_exist, test.app_name); ASSERT_EQ(err, test.expected_err); + + _ms->set_node_state(nodes, true); } + + // set FLAGS_min_allowed_replica_count successfully + res = update_flag("min_allowed_replica_count", "2"); + ASSERT_TRUE(res.is_ok()); + ASSERT_EQ(FLAGS_min_allowed_replica_count, 2); + + // set FLAGS_max_allowed_replica_count successfully + res = update_flag("max_allowed_replica_count", "6"); + ASSERT_TRUE(res.is_ok()); + ASSERT_EQ(FLAGS_max_allowed_replica_count, 6); + + // failed to set FLAGS_min_allowed_replica_count due to individual validation + res = update_flag("min_allowed_replica_count", "0"); + ASSERT_EQ(res.code(), ERR_INVALID_PARAMETERS); + ASSERT_EQ(FLAGS_min_allowed_replica_count, 2); + std::cout << res.description() << std::endl; + + // failed to set FLAGS_max_allowed_replica_count due to individual validation + res = update_flag("max_allowed_replica_count", "0"); + ASSERT_EQ(res.code(), ERR_INVALID_PARAMETERS); + ASSERT_EQ(FLAGS_max_allowed_replica_count, 6); + std::cout << res.description() << std::endl; + + // failed to set FLAGS_min_allowed_replica_count due to grouped validation + res = update_flag("min_allowed_replica_count", "7"); + ASSERT_EQ(res.code(), ERR_INVALID_PARAMETERS); + ASSERT_EQ(FLAGS_min_allowed_replica_count, 2); + std::cout << res.description() << std::endl; + + // failed to set FLAGS_max_allowed_replica_count due to grouped validation + res = update_flag("max_allowed_replica_count", "1"); + ASSERT_EQ(res.code(), ERR_INVALID_PARAMETERS); + ASSERT_EQ(FLAGS_max_allowed_replica_count, 6); + std::cout << res.description() << std::endl; + + // recover original FLAGS_min_allowed_replica_count + res = update_flag("min_allowed_replica_count", + std::to_string(reserved_min_allowed_replica_count)); + ASSERT_TRUE(res.is_ok()); + ASSERT_EQ(FLAGS_min_allowed_replica_count, reserved_min_allowed_replica_count); + + // recover original FLAGS_max_allowed_replica_count + res = update_flag("max_allowed_replica_count", + std::to_string(reserved_max_allowed_replica_count)); + ASSERT_TRUE(res.is_ok()); + ASSERT_EQ(FLAGS_max_allowed_replica_count, reserved_max_allowed_replica_count); } TEST_F(meta_app_operation_test, drop_app) diff --git a/src/meta/test/meta_backup_test.cpp b/src/meta/test/meta_backup_test.cpp index 153118eccb..761787cea4 100644 --- a/src/meta/test/meta_backup_test.cpp +++ b/src/meta/test/meta_backup_test.cpp @@ -243,12 +243,26 @@ class backup_engine_test : public meta_test_base _backup_engine->on_backup_reply(rpc_err, resp, pid, mock_primary_address); } + void mock_on_backup_reply_when_timeout(int32_t partition_index, error_code rpc_err) + { + gpid pid = gpid(_app_id, partition_index); + rpc_address mock_primary_address = rpc_address("127.0.0.1", 10000 + partition_index); + backup_response resp; + _backup_engine->on_backup_reply(rpc_err, resp, pid, mock_primary_address); + } + bool is_backup_failed() const { zauto_lock l(_backup_engine->_lock); return _backup_engine->_is_backup_failed; } + void reset_backup_engine() + { + zauto_lock l(_backup_engine->_lock); + _backup_engine->_is_backup_failed = false; + } + protected: const std::string _policy_root; const std::string _backup_root; @@ -267,28 +281,41 @@ TEST_F(backup_engine_test, test_on_backup_reply) ASSERT_TRUE(_backup_engine->is_in_progress()); // recieve a backup finished response + reset_backup_engine(); mock_on_backup_reply(/*partition_index=*/1, ERR_OK, ERR_OK, /*progress=*/cold_backup_constant::PROGRESS_FINISHED); ASSERT_TRUE(_backup_engine->is_in_progress()); - // recieve a backup in-progress response + // receive a backup in-progress response + reset_backup_engine(); mock_on_backup_reply(/*partition_index=*/2, ERR_OK, ERR_BUSY, /*progress=*/0); ASSERT_TRUE(_backup_engine->is_in_progress()); + ASSERT_EQ(_backup_engine->_backup_status[2], backup_status::ALIVE); - // recieve a backup failed response - mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0); - ASSERT_TRUE(is_backup_failed()); + // if one partition fail, all backup plan will fail + { + // receive a backup failed response + reset_backup_engine(); + mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0); + ASSERT_TRUE(is_backup_failed()); - // this backup is still a failure even recieved non-failure response - mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0); - ASSERT_TRUE(is_backup_failed()); - mock_on_backup_reply(/*partition_index=*/5, - ERR_OK, - ERR_OK, - /*progress=*/cold_backup_constant::PROGRESS_FINISHED); - ASSERT_TRUE(is_backup_failed()); + // this backup is still a failure even received non-failure response + mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0); + ASSERT_TRUE(is_backup_failed()); + + mock_on_backup_reply(/*partition_index=*/5, + ERR_OK, + ERR_OK, + /*progress=*/cold_backup_constant::PROGRESS_FINISHED); + ASSERT_TRUE(is_backup_failed()); + } + + // meta request is timeout + reset_backup_engine(); + mock_on_backup_reply_when_timeout(/*partition_index=*/5, ERR_TIMEOUT); + ASSERT_FALSE(is_backup_failed()); } TEST_F(backup_engine_test, test_backup_completed) @@ -322,6 +349,5 @@ TEST_F(backup_engine_test, test_write_backup_info_failed) fail::teardown(); } - } // namespace replication } // namespace dsn diff --git a/src/meta/test/meta_duplication_service_test.cpp b/src/meta/test/meta_duplication_service_test.cpp index cb3bd7a4a8..a7e3720ae7 100644 --- a/src/meta/test/meta_duplication_service_test.cpp +++ b/src/meta/test/meta_duplication_service_test.cpp @@ -507,7 +507,7 @@ TEST_F(meta_duplication_service_test, remove_dup) TEST_F(meta_duplication_service_test, duplication_sync) { - std::vector server_nodes = generate_node_list(3); + std::vector server_nodes = ensure_enough_alive_nodes(3); rpc_address node = server_nodes[0]; std::string test_app = "test_app_0"; diff --git a/src/meta/test/meta_mauanl_compaction_test.cpp b/src/meta/test/meta_mauanl_compaction_test.cpp new file mode 100644 index 0000000000..f10b71776f --- /dev/null +++ b/src/meta/test/meta_mauanl_compaction_test.cpp @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "meta_service_test_app.h" +#include "meta_test_base.h" + +namespace dsn { +namespace replication { +class meta_app_compaction_test : public meta_test_base +{ +public: + meta_app_compaction_test() {} + + void SetUp() override + { + meta_test_base::SetUp(); + prepare(); + } + + void prepare() + { + create_app(APP_NAME, PARTITION_COUNT); + auto app = find_app(APP_NAME); + app->partitions.resize(PARTITION_COUNT); + app->helpers->contexts.resize(PARTITION_COUNT); + for (auto i = 0; i < PARTITION_COUNT; ++i) { + serving_replica rep; + rep.compact_status = manual_compaction_status::IDLE; + std::vector reps; + reps.emplace_back(rep); + reps.emplace_back(rep); + reps.emplace_back(rep); + app->helpers->contexts[i].serving = reps; + } + } + + query_app_manual_compact_response query_manual_compaction(int32_t mock_progress) + { + manual_compaction_status::type status = manual_compaction_status::IDLE; + if (mock_progress == 0) { + status = manual_compaction_status::QUEUING; + } else if (mock_progress == 100) { + status = manual_compaction_status::FINISHED; + } + auto app = find_app(APP_NAME); + app->helpers->reset_manual_compact_status(); + for (auto &cc : app->helpers->contexts) { + for (auto &r : cc.serving) { + r.compact_status = status; + } + } + if (mock_progress == 50) { + for (auto i = 0; i < PARTITION_COUNT / 2; i++) { + auto &cc = app->helpers->contexts[i]; + for (auto &r : cc.serving) { + r.compact_status = manual_compaction_status::FINISHED; + } + } + } + auto request = dsn::make_unique(); + request->app_name = APP_NAME; + + query_manual_compact_rpc rpc(std::move(request), RPC_CM_QUERY_MANUAL_COMPACT_STATUS); + _ss->on_query_manual_compact_status(rpc); + wait_all(); + return rpc.response(); + } + +public: + std::string APP_NAME = "manual_compaction_test"; + int32_t PARTITION_COUNT = 4; +}; + +TEST_F(meta_app_compaction_test, test_query_compaction) +{ + struct test_case + { + int32_t mock_progress; + error_code expected_err; + } tests[] = {{-1, ERR_INVALID_STATE}, {0, ERR_OK}, {50, ERR_OK}, {100, ERR_OK}}; + + for (auto test : tests) { + auto resp = query_manual_compaction(test.mock_progress); + ASSERT_EQ(resp.err, test.expected_err); + if (resp.err == ERR_OK) { + ASSERT_EQ(resp.progress, test.mock_progress); + } + } +} + +} // namespace replication +} // namespace dsn diff --git a/src/meta/test/meta_test_base.cpp b/src/meta/test/meta_test_base.cpp index f4e627b17e..87ade28f33 100644 --- a/src/meta/test/meta_test_base.cpp +++ b/src/meta/test/meta_test_base.cpp @@ -17,6 +17,8 @@ #include "meta_test_base.h" +#include + #include "meta/server_load_balancer.h" #include "meta/meta_server_failure_detector.h" #include "meta/meta_split_service.h" @@ -84,6 +86,76 @@ void meta_test_base::initialize_node_state() { _ss->initialize_node_state(); } void meta_test_base::wait_all() { _ms->tracker()->wait_outstanding_tasks(); } +void meta_test_base::set_min_live_node_count_for_unfreeze(uint64_t node_count) +{ + _ms->_meta_opts.min_live_node_count_for_unfreeze = node_count; +} + +void meta_test_base::set_node_live_percentage_threshold_for_update(uint64_t percentage_threshold) +{ + _ms->_node_live_percentage_threshold_for_update = percentage_threshold; +} + +std::vector meta_test_base::get_alive_nodes() const +{ + std::vector nodes; + + zauto_read_lock l(_ss->_lock); + + for (const auto &node : _ss->_nodes) { + if (node.second.alive()) { + nodes.push_back(node.first); + } + } + + return nodes; +} + +std::vector meta_test_base::ensure_enough_alive_nodes(int min_node_count) +{ + if (min_node_count < 1) { + return std::vector(); + } + + std::vector nodes(get_alive_nodes()); + if (!nodes.empty()) { + auto node_count = static_cast(nodes.size()); + dassert_f(node_count >= min_node_count, + "there should be at least {} alive nodes, now we just have {} alive nodes", + min_node_count, + node_count); + + dinfo_f("already exists {} alive nodes: ", nodes.size()); + for (const auto &node : nodes) { + dinfo_f(" {}", node.to_string()); + } + + // ensure that _ms->_alive_set is identical with _ss->_nodes + _ms->set_node_state(nodes, true); + return nodes; + } + + nodes = generate_node_list(min_node_count); + _ms->set_node_state(nodes, true); + + while (true) { + { + std::vector alive_nodes(get_alive_nodes()); + if (static_cast(alive_nodes.size()) >= min_node_count) { + break; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + dinfo_f("created {} alive nodes: ", nodes.size()); + for (const auto &node : nodes) { + dinfo_f(" {}", node.to_string()); + } + return nodes; +} + void meta_test_base::create_app(const std::string &name, uint32_t partition_count) { configuration_create_app_request req; @@ -96,6 +168,9 @@ void meta_test_base::create_app(const std::string &name, uint32_t partition_coun req.options.is_stateful = true; req.options.envs["value_version"] = "1"; + set_min_live_node_count_for_unfreeze(2); + ensure_enough_alive_nodes(3); + auto result = fake_create_app(_ss.get(), req); fake_wait_rpc(result, resp); ASSERT_EQ(resp.err, ERR_OK) << resp.err.to_string() << " " << name; diff --git a/src/meta/test/meta_test_base.h b/src/meta/test/meta_test_base.h index 1c19298820..c84d064ce2 100644 --- a/src/meta/test/meta_test_base.h +++ b/src/meta/test/meta_test_base.h @@ -45,6 +45,12 @@ class meta_test_base : public testing::Test void wait_all(); + void set_min_live_node_count_for_unfreeze(uint64_t node_count); + + void set_node_live_percentage_threshold_for_update(uint64_t percentage_threshold); + + std::vector ensure_enough_alive_nodes(int min_node_count); + // create an app for test with specified name and specified partition count void create_app(const std::string &name, uint32_t partition_count); @@ -70,6 +76,9 @@ class meta_test_base : public testing::Test std::shared_ptr _ss; std::unique_ptr _ms; std::string _app_root; + +private: + std::vector get_alive_nodes() const; }; } // namespace replication diff --git a/src/replica/replica_backup.cpp b/src/replica/replica_backup.cpp index b9ba9860ea..2268fe5936 100644 --- a/src/replica/replica_backup.cpp +++ b/src/replica/replica_backup.cpp @@ -219,7 +219,7 @@ void replica::send_backup_request_to_secondary(const backup_request &request) { for (const auto &target_address : _primary_states.membership.secondaries) { // primary will send backup_request to secondary periodically - // so, we shouldn't handler the response + // so, we shouldn't handle the response rpc::call_one_way_typed(target_address, RPC_COLD_BACKUP, request, get_gpid().thread_hash()); } } From 0d13302e5bee0aa726659e928bd3404f2c89c452 Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Thu, 16 Dec 2021 16:18:19 +0800 Subject: [PATCH 02/10] modify comments --- src/meta/backup_engine.cpp | 3 ++- src/meta/meta_backup_service.h | 2 +- src/meta/test/meta_backup_test.cpp | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index f57f2be574..74818c53ff 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -253,7 +253,8 @@ void backup_engine::on_backup_reply(error_code err, _backup_status[partition] = backup_status::FAILED; return; } - // default function is retry + + // when response.err == ERR_BUSY or other states, meta polling to send request ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " "{}, response error {}, retry to send backup request.", _cur_backup.backup_id, diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h index c3f9553712..7a38b1bb9d 100644 --- a/src/meta/meta_backup_service.h +++ b/src/meta/meta_backup_service.h @@ -381,7 +381,7 @@ class backup_service std::map> _policy_states; // policy_name -> policy_context - // _backup_states store all states of one-time backup in the cluster, not persistence to ZK + // _backup_states stores all states of one-time backup in the cluster, not persistence to ZK std::vector> _backup_states; // the root of policy metas, stored on remote_storage(zookeeper) diff --git a/src/meta/test/meta_backup_test.cpp b/src/meta/test/meta_backup_test.cpp index 761787cea4..d13832b05b 100644 --- a/src/meta/test/meta_backup_test.cpp +++ b/src/meta/test/meta_backup_test.cpp @@ -349,5 +349,6 @@ TEST_F(backup_engine_test, test_write_backup_info_failed) fail::teardown(); } + } // namespace replication } // namespace dsn From 4a50a289a26ad9cd1293b398633c91d5b59af8ce Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Thu, 16 Dec 2021 16:56:07 +0800 Subject: [PATCH 03/10] refactor function --- src/meta/backup_engine.cpp | 100 ++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 47 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 74818c53ff..abecdec941 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -199,24 +199,6 @@ void backup_engine::on_backup_reply(error_code err, gpid pid, const rpc_address &primary) { - // we should check err before checking response.pid - if (err != ERR_OK) { - dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string()); - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, pid]() { backup_app_partition(pid); }, - 0, - std::chrono::seconds(1)); - return; - }; - - dcheck_eq(response.pid, pid); - dcheck_eq(response.backup_id, _cur_backup.backup_id); - { zauto_lock l(_lock); // if backup of some partition failed, we would not handle response from other partitions. @@ -230,44 +212,68 @@ void backup_engine::on_backup_reply(error_code err, // if backup failed, receive ERR_LOCAL_APP_FAILURE; // backup not completed in other cases. // see replica::on_cold_backup() for details. - int32_t partition = pid.get_partition_index(); - if (response.err == ERR_OK && response.progress == cold_backup_constant::PROGRESS_FINISHED) { + if (err != ERR_OK || response.err != ERR_OK) { + if (response.err == ERR_LOCAL_APP_FAILURE) { + dcheck_eq(response.pid, pid); + dcheck_eq(response.backup_id, _cur_backup.backup_id); + + derror_f("backup_id({}): backup for partition {} failed, error message: {}, " + "response.err: {}", + _cur_backup.backup_id, + pid.to_string(), + err.to_string(), + response.err.to_string()); + zauto_lock l(_lock); + // if one partition fail, the whole backup plan fail. + _is_backup_failed = true; + _backup_status[pid.get_partition_index()] = backup_status::FAILED; + return; + } else { + // backup not completed, retry sending requests + dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, " + "response.err: {} , retry to " + "send backup request.", + _cur_backup.backup_id, + primary.to_string(), + err.to_string(), + response.err.to_string()); + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(1)); + } + return; + }; + + if (response.progress == cold_backup_constant::PROGRESS_FINISHED) { + dcheck_eq(response.pid, pid); + dcheck_eq(response.backup_id, _cur_backup.backup_id); ddebug_f("backup_id({}): backup for partition {} completed.", _cur_backup.backup_id, pid.to_string()); { zauto_lock l(_lock); - _backup_status[partition] = backup_status::COMPLETED; + _backup_status[pid.get_partition_index()] = backup_status::COMPLETED; } complete_current_backup(); - return; - } + } else { + // backup is not finished, meta polling to send request + ddebug_f( + "backup_id({}): receive backup response for partition {} from server {}, rpc error " + "{}, response error {}, retry to send backup request.", + _cur_backup.backup_id, + pid.to_string(), + primary.to_string(), + err.to_string(), + response.err.to_string()); - if (response.err == ERR_LOCAL_APP_FAILURE) { - derror_f("backup_id({}): backup for partition {} failed, error message: {}", - _cur_backup.backup_id, - pid.to_string(), - response.err.to_string()); - zauto_lock l(_lock); - _is_backup_failed = true; - _backup_status[partition] = backup_status::FAILED; - return; + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(1)); } - - // when response.err == ERR_BUSY or other states, meta polling to send request - ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}, retry to send backup request.", - _cur_backup.backup_id, - pid.to_string(), - primary.to_string(), - err.to_string(), - response.err.to_string()); - - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, pid]() { backup_app_partition(pid); }, - 0, - std::chrono::seconds(1)); } void backup_engine::write_backup_info() From 98059cdbfa85c401c3ceaf34305d6d3e8c29b870 Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Mon, 20 Dec 2021 11:41:41 +0800 Subject: [PATCH 04/10] add progress in log --- src/meta/backup_engine.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index abecdec941..6dd61a7b67 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -261,12 +261,13 @@ void backup_engine::on_backup_reply(error_code err, // backup is not finished, meta polling to send request ddebug_f( "backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}, retry to send backup request.", + "{}, response error {}, now progress {}, retry to send backup request.", _cur_backup.backup_id, pid.to_string(), primary.to_string(), err.to_string(), - response.err.to_string()); + response.err.to_string(), + response.progress); tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, From 5a696cba1bdf7fb603a2d53c1d3a774ce672dd72 Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 21 Dec 2021 11:53:54 +0800 Subject: [PATCH 05/10] refactor --- src/meta/backup_engine.cpp | 123 ++++++++++++++++++++++--------------- src/meta/backup_engine.h | 3 + 2 files changed, 76 insertions(+), 50 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 6dd61a7b67..618f0b9fe6 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -194,9 +194,38 @@ void backup_engine::backup_app_partition(const gpid &pid) _backup_status[pid.get_partition_index()] = backup_status::ALIVE; } -void backup_engine::on_backup_reply(error_code err, +inline void backup_engine::handle_replica_backup_failed(const error_code err, + const backup_response &response, + const gpid pid) +{ + dcheck_eq(response.pid, pid); + dcheck_eq(response.backup_id, _cur_backup.backup_id); + + derror_f("backup_id({}): backup for partition {} failed, error message: {}, " + "response.err: {}", + _cur_backup.backup_id, + pid.to_string(), + err.to_string(), + response.err.to_string()); + zauto_lock l(_lock); + // if one partition fail, the whole backup plan fail. + _is_backup_failed = true; + _backup_status[pid.get_partition_index()] = backup_status::FAILED; + return; +} + +inline void backup_engine::retry_backup(const dsn::gpid pid) +{ + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(1)); +} + +void backup_engine::on_backup_reply(const error_code err, const backup_response &response, - gpid pid, + const gpid pid, const rpc_address &primary) { { @@ -212,40 +241,37 @@ void backup_engine::on_backup_reply(error_code err, // if backup failed, receive ERR_LOCAL_APP_FAILURE; // backup not completed in other cases. // see replica::on_cold_backup() for details. - if (err != ERR_OK || response.err != ERR_OK) { - if (response.err == ERR_LOCAL_APP_FAILURE) { - dcheck_eq(response.pid, pid); - dcheck_eq(response.backup_id, _cur_backup.backup_id); - - derror_f("backup_id({}): backup for partition {} failed, error message: {}, " - "response.err: {}", - _cur_backup.backup_id, - pid.to_string(), - err.to_string(), - response.err.to_string()); - zauto_lock l(_lock); - // if one partition fail, the whole backup plan fail. - _is_backup_failed = true; - _backup_status[pid.get_partition_index()] = backup_status::FAILED; - return; - } else { - // backup not completed, retry sending requests - dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, " - "response.err: {} , retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string(), - response.err.to_string()); - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, pid]() { backup_app_partition(pid); }, - 0, - std::chrono::seconds(1)); - } + if (err != ERR_OK) { + dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, " + "response.err: {} , retry to " + "send backup request.", + _cur_backup.backup_id, + primary.to_string(), + err.to_string(), + response.err.to_string()); + + retry_backup(pid); return; }; + if (response.err == ERR_LOCAL_APP_FAILURE) { + handle_replica_backup_failed(err, response, pid); + return; + } + + if (response.err != ERR_OK) { + dwarn_f("backup_id({}): replica {} backup failed, rpc error: {}, " + "response.err: {} , retry to " + "send backup request.", + _cur_backup.backup_id, + primary.to_string(), + err.to_string(), + response.err.to_string()); + + retry_backup(pid); + return; + } + if (response.progress == cold_backup_constant::PROGRESS_FINISHED) { dcheck_eq(response.pid, pid); dcheck_eq(response.backup_id, _cur_backup.backup_id); @@ -257,24 +283,21 @@ void backup_engine::on_backup_reply(error_code err, _backup_status[pid.get_partition_index()] = backup_status::COMPLETED; } complete_current_backup(); - } else { - // backup is not finished, meta polling to send request - ddebug_f( - "backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}, now progress {}, retry to send backup request.", - _cur_backup.backup_id, - pid.to_string(), - primary.to_string(), - err.to_string(), - response.err.to_string(), - response.progress); - - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, pid]() { backup_app_partition(pid); }, - 0, - std::chrono::seconds(1)); + return; } + + // backup is not finished, meta polling to send request + ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " + "{}, response error {}, now progress {}, retry to send backup request.", + _cur_backup.backup_id, + pid.to_string(), + primary.to_string(), + err.to_string(), + response.err.to_string(), + response.progress); + + retry_backup(pid); + return; } void backup_engine::write_backup_info() diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h index 55f1065da3..0d82b5eec4 100644 --- a/src/meta/backup_engine.h +++ b/src/meta/backup_engine.h @@ -84,6 +84,9 @@ class backup_engine const rpc_address &primary); void write_backup_info(); void complete_current_backup(); + void + handle_replica_backup_failed(const error_code err, const backup_response &response,const gpid pid); + void retry_backup(const dsn::gpid pid); const std::string get_policy_name() const { From c05b9ad13cd4de8ba2202f785844f7d08ef4c22d Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 21 Dec 2021 12:16:37 +0800 Subject: [PATCH 06/10] format --- src/meta/backup_engine.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h index 0d82b5eec4..c006f9d627 100644 --- a/src/meta/backup_engine.h +++ b/src/meta/backup_engine.h @@ -84,8 +84,9 @@ class backup_engine const rpc_address &primary); void write_backup_info(); void complete_current_backup(); - void - handle_replica_backup_failed(const error_code err, const backup_response &response,const gpid pid); + void handle_replica_backup_failed(const error_code err, + const backup_response &response, + const gpid pid); void retry_backup(const dsn::gpid pid); const std::string get_policy_name() const From 21040ec43002d38c2761c65d881652fbd6922753 Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 21 Dec 2021 17:56:26 +0800 Subject: [PATCH 07/10] modify by CR --- src/meta/backup_engine.cpp | 39 +++++++++++++++----------------------- src/meta/backup_engine.h | 4 +--- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 618f0b9fe6..39ead05967 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -194,18 +194,15 @@ void backup_engine::backup_app_partition(const gpid &pid) _backup_status[pid.get_partition_index()] = backup_status::ALIVE; } -inline void backup_engine::handle_replica_backup_failed(const error_code err, - const backup_response &response, +inline void backup_engine::handle_replica_backup_failed(const backup_response &response, const gpid pid) { dcheck_eq(response.pid, pid); dcheck_eq(response.backup_id, _cur_backup.backup_id); - derror_f("backup_id({}): backup for partition {} failed, error message: {}, " - "response.err: {}", + derror_f("backup_id({}): backup for partition {} failed, response.err: {}", _cur_backup.backup_id, pid.to_string(), - err.to_string(), response.err.to_string()); zauto_lock l(_lock); // if one partition fail, the whole backup plan fail. @@ -242,31 +239,27 @@ void backup_engine::on_backup_reply(const error_code err, // backup not completed in other cases. // see replica::on_cold_backup() for details. if (err != ERR_OK) { - dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, " - "response.err: {} , retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string(), - response.err.to_string()); + derror_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to " + "send backup request.", + _cur_backup.backup_id, + primary.to_string(), + err.to_string()); retry_backup(pid); return; }; if (response.err == ERR_LOCAL_APP_FAILURE) { - handle_replica_backup_failed(err, response, pid); + handle_replica_backup_failed(response, pid); return; } if (response.err != ERR_OK) { - dwarn_f("backup_id({}): replica {} backup failed, rpc error: {}, " - "response.err: {} , retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string(), - response.err.to_string()); + derror_f("backup_id({}): replica {} backup failed, response.err: {} , retry to send backup " + "request.", + _cur_backup.backup_id, + primary.to_string(), + response.err.to_string()); retry_backup(pid); return; @@ -287,13 +280,11 @@ void backup_engine::on_backup_reply(const error_code err, } // backup is not finished, meta polling to send request - ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}, now progress {}, retry to send backup request.", + ddebug_f("backup_id({}): receive backup response for partition {} from server {}, now " + "progress {}, retry to send backup request.", _cur_backup.backup_id, pid.to_string(), primary.to_string(), - err.to_string(), - response.err.to_string(), response.progress); retry_backup(pid); diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h index c006f9d627..75a84b7935 100644 --- a/src/meta/backup_engine.h +++ b/src/meta/backup_engine.h @@ -84,9 +84,7 @@ class backup_engine const rpc_address &primary); void write_backup_info(); void complete_current_backup(); - void handle_replica_backup_failed(const error_code err, - const backup_response &response, - const gpid pid); + void handle_replica_backup_failed(const backup_response &response, const gpid pid); void retry_backup(const dsn::gpid pid); const std::string get_policy_name() const From cc8cd67ae9c551599d8d6b71662459ba83e19d4b Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Thu, 23 Dec 2021 12:00:04 +0800 Subject: [PATCH 08/10] modify by CR --- src/meta/backup_engine.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 39ead05967..ee250157be 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -208,7 +208,6 @@ inline void backup_engine::handle_replica_backup_failed(const backup_response &r // if one partition fail, the whole backup plan fail. _is_backup_failed = true; _backup_status[pid.get_partition_index()] = backup_status::FAILED; - return; } inline void backup_engine::retry_backup(const dsn::gpid pid) @@ -288,7 +287,6 @@ void backup_engine::on_backup_reply(const error_code err, response.progress); retry_backup(pid); - return; } void backup_engine::write_backup_info() From 6a1c506123b611cbda0ed6ed46367c3408718b1d Mon Sep 17 00:00:00 2001 From: tang yanzhao Date: Tue, 25 Jan 2022 18:28:06 +0800 Subject: [PATCH 09/10] update --- src/meta/backup_engine.cpp | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index ee250157be..3ce85a1128 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -237,32 +237,23 @@ void backup_engine::on_backup_reply(const error_code err, // if backup failed, receive ERR_LOCAL_APP_FAILURE; // backup not completed in other cases. // see replica::on_cold_backup() for details. - if (err != ERR_OK) { - derror_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string()); - retry_backup(pid); - return; - }; + auto rep_error = err == ERR_OK ? response.err : err; - if (response.err == ERR_LOCAL_APP_FAILURE) { + if (rep_error == ERR_LOCAL_APP_FAILURE) { handle_replica_backup_failed(response, pid); return; } - if (response.err != ERR_OK) { - derror_f("backup_id({}): replica {} backup failed, response.err: {} , retry to send backup " - "request.", + if (rep_error != ERR_OK) { + derror_f("backup_id({}): backup request to server {} failed, rpc error: {}, retry to " + "send backup request.", _cur_backup.backup_id, primary.to_string(), - response.err.to_string()); - + rep_error.to_string()); retry_backup(pid); return; - } + }; if (response.progress == cold_backup_constant::PROGRESS_FINISHED) { dcheck_eq(response.pid, pid); From 45c43c997c379476315af1a6e045de4503265349 Mon Sep 17 00:00:00 2001 From: Smilencer <527646889@qq.com> Date: Wed, 26 Jan 2022 16:01:06 +0800 Subject: [PATCH 10/10] typo --- src/meta/backup_engine.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 291c48e039..47af195257 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -246,7 +246,7 @@ void backup_engine::on_backup_reply(const error_code err, } if (rep_error != ERR_OK) { - derror_f("backup_id({}): backup request to server {} failed, rpc error: {}, retry to " + derror_f("backup_id({}): backup request to server {} failed, error: {}, retry to " "send backup request.", _cur_backup.backup_id, primary.to_string(),