diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 5d1d329472..de572fda59 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -38,6 +38,7 @@ typedef std::unordered_map<::dsn::rpc_address, dsn::task_ptr> node_tasks; typedef rpc_holder query_split_rpc; typedef rpc_holder stop_split_rpc; typedef rpc_holder control_split_rpc; +typedef rpc_holder register_child_rpc; class replication_options { diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 3d83f43655..ca9f562744 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -378,11 +378,16 @@ class replica : public serverlet, public ref_counter, public replica_ba rpc_address finish_update_address, bool is_update_child); - // all replicas update partition count, primary will register children on meta - virtual void register_child_on_meta(ballot b); - virtual void on_register_child_on_meta_reply(dsn::error_code ec, - std::shared_ptr request, - std::shared_ptr response); + // primary parent register children on meta_server + void register_child_on_meta(ballot b); + void on_register_child_on_meta_reply(dsn::error_code ec, + const register_child_request &request, + const register_child_response &response); + // primary sends register request to meta_server + void parent_send_register_request(const register_child_request &request); + + // child partition has been registered on meta_server, could be active + void child_partition_active(const partition_configuration &config); // meta <=> replica configuration sync through on_config_sync // called by primary replica to check if partition count changed and partition flag changed to @@ -414,9 +419,6 @@ class replica : public serverlet, public ref_counter, public replica_ba // parent replica handle child ack when child copy mutation synchronously void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d); - // child partitions have been registered on meta, could be active - void child_partition_active(const partition_configuration &config); - // child copy parent prepare list and call child_learn_states void child_copy_prepare_list(learn_state lstate, std::vector mutation_list, diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 191dad35cf..7ba06ab7e9 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -362,6 +362,13 @@ void replica::update_configuration_on_meta_server(config_type::type type, ::dsn::rpc_address node, partition_configuration &newConfig) { + // type should never be `CT_REGISTER_CHILD` + // if this happens, it means serious mistake happened during partition split + // assert here to stop split and avoid splitting wrong + if (type == config_type::CT_REGISTER_CHILD) { + dassert_replica(false, "invalid config_type, type = {}", enum_to_string(type)); + } + newConfig.last_committed_decree = last_committed_decree(); if (type == config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT) { @@ -370,8 +377,6 @@ void replica::update_configuration_on_meta_server(config_type::type type, ""); dassert( newConfig.primary == node, "%s VS %s", newConfig.primary.to_string(), node.to_string()); - } else if (type == config_type::CT_REGISTER_CHILD) { - dassert(false, "invalid config_type, type = %s", enum_to_string(type)); } else if (type != config_type::CT_ASSIGN_PRIMARY && type != config_type::CT_UPGRADE_TO_PRIMARY) { dassert(status() == partition_status::PS_PRIMARY, @@ -720,12 +725,11 @@ bool replica::update_local_configuration(const replica_configuration &config, break; case partition_status::PS_PARTITION_SPLIT: if (config.status == partition_status::PS_INACTIVE) { - dwarn("%s: status change from %s @ %" PRId64 " to %s @ %" PRId64 " is not allowed", - name(), - enum_to_string(old_status), - old_ballot, - enum_to_string(config.status), - config.ballot); + dwarn_replica("status change from {} @ {} to {} @ {} is not allowed", + enum_to_string(old_status), + old_ballot, + enum_to_string(config.status), + config.ballot); return false; } break; @@ -1321,15 +1325,5 @@ void replica::on_query_child_state_reply(error_code ec, } } -void replica::child_partition_active(const partition_configuration &config) -{ - ddebug_f("{} finish partition split and become active", name()); - _stub->_counter_replicas_splitting_recent_split_succ_count->increment(); - // TODO(hyc): should set is false - _primary_states.sync_send_write_request = false; - _primary_states.last_prepare_decree_on_new_primary = _prepare_list->max_decree(); - update_configuration(config); -} - } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index 12cbfdafb3..42c3cfa8c1 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -119,7 +119,8 @@ class primary_context // otherwise, not during partition split bool sync_send_write_request{false}; - // replica->meta register child on meta server and remote storage + // Used for partition split + // primary parent register child on meta_server task dsn::task_ptr register_child_task; // replica-> meta query child partition configuration diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp index 97e1adaec0..9d2f177ac7 100644 --- a/src/dist/replication/lib/replica_split.cpp +++ b/src/dist/replication/lib/replica_split.cpp @@ -913,14 +913,12 @@ void replica::on_update_group_partition_count_reply( void replica::register_child_on_meta(ballot b) // on primary parent { if (status() != partition_status::PS_PRIMARY) { - dwarn_f("{} is not primary, can not register child, current status is {}", - name(), - enum_to_string(status())); + dwarn_replica("failed to register child, status = {}", enum_to_string(status())); return; } if (_primary_states.reconfiguration_task != nullptr) { - ddebug_f("{} is under reconfiguration, delay to register child", name()); + dwarn_replica("under reconfiguration, delay and retry to register child"); _primary_states.register_child_task = tasking::enqueue(LPC_PARTITION_SPLIT, tracker(), @@ -930,15 +928,6 @@ void replica::register_child_on_meta(ballot b) // on primary parent return; } - if (b != get_ballot()) { - dwarn_f( - "{} failed to register child, may out-dated, request ballot is {}, local ballot is {}", - name(), - b, - get_ballot()); - return; - } - partition_configuration child_config = _primary_states.membership; child_config.ballot++; child_config.last_committed_decree = 0; @@ -946,190 +935,131 @@ void replica::register_child_on_meta(ballot b) // on primary parent child_config.pid.set_partition_index(_app_info.partition_count + get_gpid().get_partition_index()); - std::shared_ptr request(new register_child_request); - request->app = _app_info; - request->child_config = child_config; - request->parent_config = _primary_states.membership; - request->primary_address = _stub->_primary_address; + register_child_request request; + request.app = _app_info; + request.child_config = child_config; + request.parent_config = _primary_states.membership; + request.primary_address = _stub->_primary_address; + // reject client request update_local_configuration_with_no_ballot_change(partition_status::PS_INACTIVE); set_inactive_state_transient(true); _partition_version = -1; - ddebug_f("{} set register child partition({}.{}) request to meta, current ballot is {}, child " - "ballot is {}", - name(), - request->child_config.pid.get_app_id(), - request->child_config.pid.get_partition_index(), - request->parent_config.ballot, - request->child_config.ballot); + parent_send_register_request(request); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::parent_send_register_request( + const register_child_request &request) // on primary parent +{ + FAIL_POINT_INJECT_F("replica_parent_send_register_request", [](dsn::string_view) {}); + + dcheck_eq_replica(status(), partition_status::PS_INACTIVE); + ddebug_replica( + "send register child({}) request to meta_server, current ballot = {}, child ballot = {}", + request.child_config.pid, + request.parent_config.ballot, + request.child_config.ballot); rpc_address meta_address(_stub->_failure_detector->get_servers()); - // TODO(hyc): new change - _primary_states.register_child_task = rpc::call( - meta_address, - RPC_CM_REGISTER_CHILD_REPLICA, - *request, - tracker(), - [=](error_code ec, register_child_response &&response) { - on_register_child_on_meta_reply( - ec, request, std::make_shared(std::move(response))); - }, - std::chrono::seconds(0), - get_gpid().thread_hash()); + std::unique_ptr req = make_unique(request); + register_child_rpc rpc(std::move(req), RPC_CM_REGISTER_CHILD_REPLICA); + _primary_states.register_child_task = + rpc.call(meta_address, + tracker(), + [this, rpc](error_code ec) mutable { + on_register_child_on_meta_reply(ec, rpc.request(), rpc.response()); + }, + _split_states.parent_gpid.thread_hash()); } +// ThreadPool: THREAD_POOL_REPLICATION void replica::on_register_child_on_meta_reply( dsn::error_code ec, - std::shared_ptr request, - std::shared_ptr response) // on primary parent + const register_child_request &request, + const register_child_response &response) // on primary parent { - // TODO(hyc): consider where should add check + FAIL_POINT_INJECT_F("replica_on_register_child_on_meta_reply", [](dsn::string_view) {}); + _checker.only_one_thread_access(); - if (partition_status::PS_INACTIVE != status() || _stub->is_connected() == false) { - dwarn_f("{} status wrong or stub is not connected, status is {}", - name(), - enum_to_string(status())); + // primary parent is under reconfiguration, whose status should be PS_INACTIVE + if (partition_status::PS_INACTIVE != status() || !_stub->is_connected()) { + dwarn_replica("status wrong or stub is not connected, status = {}", + enum_to_string(status())); _primary_states.register_child_task = nullptr; _primary_states.query_child_state_task = nullptr; - return; } - if (ec == ERR_OK) { - ec = response->err; - } - - // handle register failed - if (ec == ERR_CHILD_DROPPED || ec == ERR_REJECT) { - dwarn_f("{}: register child({}.{}) failed coz partition split is paused or canceled", - name(), - request->child_config.pid.get_app_id(), - request->child_config.pid.get_partition_index()); - _stub->split_replica_error_handler( - _child_gpid, - std::bind(&replica::child_handle_split_error, - std::placeholders::_1, - "register child failed coz split paused or canceled")); - _child_gpid.set_app_id(0); - return; - } - - if (ec != ERR_OK) { - dwarn_f("{}: register child({}.{}) reply with error {}, request child ballot is {}, local " - "ballot is {}", - name(), - request->child_config.pid.get_app_id(), - request->child_config.pid.get_partition_index(), - ec.to_string(), - request->child_config.ballot, - get_ballot()); + dsn::error_code err = ec == ERR_OK ? response.err : ec; + if (err != ERR_OK) { + dwarn_replica( + "register child({}) failed, error = {}, request child ballot = {}, local ballot = {}", + request.child_config.pid, + err.to_string(), + request.child_config.ballot, + get_ballot()); - if (ec != ERR_INVALID_VERSION && ec != ERR_CHILD_REGISTERED) { - _primary_states.register_child_task = tasking::enqueue( - LPC_DELAY_UPDATE_CONFIG, - tracker(), - [this, request]() { - rpc_address target(_stub->_failure_detector->get_servers()); - auto rpc_task_ptr = rpc::call( - target, - RPC_CM_REGISTER_CHILD_REPLICA, - *request, - tracker(), - [=](error_code err, register_child_response &&resp) { - on_register_child_on_meta_reply( - err, - request, - std::make_shared(std::move(resp))); - }, - std::chrono::seconds(0), - get_gpid().thread_hash()); - _primary_states.register_child_task = rpc_task_ptr; - }, - get_gpid().thread_hash(), - std::chrono::seconds(1)); + // register request is out-of-dated + if (err == ERR_INVALID_VERSION) { + return; + } + // we need not resend register request if child has been registered + if (err != ERR_CHILD_REGISTERED) { + _primary_states.register_child_task = + tasking::enqueue(LPC_DELAY_UPDATE_CONFIG, + tracker(), + std::bind(&replica::parent_send_register_request, this, request), + get_gpid().thread_hash(), + std::chrono::seconds(1)); return; } } - if (response->parent_config.pid != get_gpid() || response->child_config.pid != _child_gpid) { - derror_f("{}: remote gpid ({}.{}) VS local gpid ({}.{}), remote child ({}.{}) VS local " - "child ({}.{}), something wrong with meta, retry", - name(), - response->parent_config.pid.get_app_id(), - response->parent_config.pid.get_partition_index(), - get_gpid().get_app_id(), - get_gpid().get_partition_index(), - response->child_config.pid.get_app_id(), - response->child_config.pid.get_partition_index(), - _child_gpid.get_app_id(), - _child_gpid.get_partition_index()); - - _primary_states.register_child_task = tasking::enqueue( - LPC_DELAY_UPDATE_CONFIG, - tracker(), - [this, request]() { - rpc_address target(_stub->_failure_detector->get_servers()); - auto rpc_task_ptr = - rpc::call(target, - RPC_CM_REGISTER_CHILD_REPLICA, - *request, - tracker(), - [=](error_code err, register_child_response &&resp) { - on_register_child_on_meta_reply( - err, - request, - std::make_shared(std::move(resp))); - }, - std::chrono::seconds(0), - get_gpid().thread_hash()); - _primary_states.register_child_task = rpc_task_ptr; - }, - get_gpid().thread_hash(), - std::chrono::seconds(1)); + if (err == ERR_OK) { + ddebug_replica("register child({}) succeed, response parent ballot = {}, local ballot = " + "{}, local status = {}", + response.child_config.pid, + response.parent_config.ballot, + get_ballot(), + enum_to_string(status())); - return; - } - - if (ec == ERR_OK && response->err == ERR_OK) { - ddebug_f( - "{}: register child({}.{}) succeed, parent ballot is {}, local ballot is {}, local " - "status {}", - name(), - response->child_config.pid.get_app_id(), - response->child_config.pid.get_partition_index(), - response->parent_config.ballot, - get_ballot(), - enum_to_string(status())); - dassert(_app_info.partition_count * 2 == response->app.partition_count, - "local partition count is %d, remote partition count is %d", - _app_info.partition_count, - response->app.partition_count); - // make child replica become available + dcheck_eq_replica(_app_info.partition_count * 2, response.app.partition_count); _stub->split_replica_exec(LPC_PARTITION_SPLIT, - response->child_config.pid, + response.child_config.pid, std::bind(&replica::child_partition_active, std::placeholders::_1, - response->child_config)); - update_group_partition_count(response->app.partition_count, false); + response.child_config)); + update_group_partition_count(response.app.partition_count, false); + + // TODO(heyuchen): TBD - update parent group partition_count } + // parent register child succeed or child partition has already resgitered + // in both situation, we should reset resgiter child task and child_gpid _primary_states.register_child_task = nullptr; - // TODO(hyc): when to change it into false, should there - _primary_states.sync_send_write_request = true; - _child_gpid.set_app_id(0); - if (response->parent_config.ballot >= get_ballot()) { - ddebug_f("{} ballot in response is {}, local ballot is {}, should update configuration", - name(), - response->parent_config.ballot, - get_ballot()); - update_configuration(response->parent_config); + _child_gpid.set_app_id(0); + if (response.parent_config.ballot >= get_ballot()) { + ddebug_replica("response ballot = {}, local ballot = {}, should update configuration", + response.parent_config.ballot, + get_ballot()); + update_configuration(response.parent_config); } } +// ThreadPool: THREAD_POOL_REPLICATION +void replica::child_partition_active(const partition_configuration &config) // on child +{ + ddebug_replica("child partition become active"); + _stub->_counter_replicas_splitting_recent_split_succ_count->increment(); + _primary_states.last_prepare_decree_on_new_primary = _prepare_list->max_decree(); + update_configuration(config); +} + /// /// child replica copy mutations of parent /// diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index 6c31a73054..ec0a3c90b6 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -940,10 +940,7 @@ void meta_service::on_register_child_on_meta(register_child_rpc rpc) tasking::enqueue(LPC_META_STATE_NORMAL, tracker(), - [this, rpc]() { - dassert(_split_svc, "meta_split_service is uninitialized"); - _split_svc->register_child_on_meta(std::move(rpc)); - }, + [this, rpc]() { _split_svc->register_child_on_meta(std::move(rpc)); }, server_state::sStateHash); } diff --git a/src/dist/replication/meta_server/meta_service.h b/src/dist/replication/meta_server/meta_service.h index 483fe974f6..a49684b457 100644 --- a/src/dist/replication/meta_server/meta_service.h +++ b/src/dist/replication/meta_server/meta_service.h @@ -73,8 +73,6 @@ typedef rpc_holder typedef rpc_holder app_partition_split_rpc; -typedef rpc_holder register_child_rpc; - typedef rpc_holder query_child_state_rpc; class meta_service : public serverlet diff --git a/src/dist/replication/meta_server/meta_split_service.cpp b/src/dist/replication/meta_server/meta_split_service.cpp index 9b068847f9..e3511d6860 100644 --- a/src/dist/replication/meta_server/meta_split_service.cpp +++ b/src/dist/replication/meta_server/meta_split_service.cpp @@ -144,10 +144,9 @@ void meta_split_service::register_child_on_meta(register_child_rpc rpc) response.err = ERR_IO_PENDING; zauto_write_lock(app_lock()); - std::shared_ptr app = _state->get_app(request.app.app_id); - dassert(app != nullptr, "get get app for app id(%d)", request.app.app_id); - dassert(app->is_stateful, "don't support stateless apps currently, id(%d)", request.app.app_id); + dassert_f(app != nullptr, "app is not existed, id({})", request.app.app_id); + dassert_f(app->is_stateful, "app is stateless currently, id({})", request.app.app_id); dsn::gpid parent_gpid = request.parent_config.pid; dsn::gpid child_gpid = request.child_config.pid; @@ -160,8 +159,9 @@ void meta_split_service::register_child_on_meta(register_child_rpc rpc) return; } - partition_configuration parent_config = app->partitions[parent_gpid.get_partition_index()]; - partition_configuration child_config = app->partitions[child_gpid.get_partition_index()]; + const partition_configuration &parent_config = + app->partitions[parent_gpid.get_partition_index()]; + const partition_configuration &child_config = app->partitions[child_gpid.get_partition_index()]; config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()]; // TODO(heyuchen): pause remove @@ -199,10 +199,8 @@ void meta_split_service::register_child_on_meta(register_child_rpc rpc) if (child_config.ballot != invalid_ballot) { dwarn_f( - "duplicated register child request, gpid({}.{}) has already been registered, ballot " - "is {}", - child_gpid.get_app_id(), - child_gpid.get_partition_index(), + "duplicated register child request, child({}) has already been registered, ballot = {}", + child_gpid, child_config.ballot); response.err = ERR_CHILD_REGISTERED; return; @@ -210,20 +208,15 @@ void meta_split_service::register_child_on_meta(register_child_rpc rpc) if (parent_context.stage == config_status::pending_proposal || parent_context.stage == config_status::pending_remote_sync) { - dwarn_f("another request is syncing with remote storage, ignore this request - gpid({}.{}) " - "register child", - parent_gpid.get_app_id(), - parent_gpid.get_partition_index()); + dwarn_f( + "partition({}): another request is syncing with remote storage, ignore this request", + parent_gpid); return; } app->helpers->split_states.status.erase(parent_gpid.get_partition_index()); app->helpers->split_states.splitting_count--; - ddebug_f("gpid({}.{}) will resgiter child gpid({}.{})", - parent_gpid.get_app_id(), - parent_gpid.get_partition_index(), - child_gpid.get_app_id(), - child_gpid.get_partition_index()); + ddebug_f("parent({}) will register child({})", parent_gpid, child_gpid); parent_context.stage = config_status::pending_remote_sync; parent_context.msg = rpc.dsn_request(); @@ -233,9 +226,8 @@ void meta_split_service::register_child_on_meta(register_child_rpc rpc) dsn::task_ptr meta_split_service::add_child_on_remote_storage(register_child_rpc rpc, bool create_new) { - auto &request = rpc.request(); - - std::string partition_path = _state->get_partition_path(request.child_config.pid); + const auto &request = rpc.request(); + const std::string &partition_path = _state->get_partition_path(request.child_config.pid); blob value = dsn::json::json_forwarder::encode(request.child_config); if (create_new) { @@ -272,12 +264,13 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec, auto &response = rpc.response(); std::shared_ptr app = _state->get_app(request.app.app_id); - dassert(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, - "if app removed, this task should be cancelled"); + dassert_f(app != nullptr, "app is not existed, id({})", request.app.app_id); + dassert_f(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, + "app is not available now, id({})", + request.app.app_id); dsn::gpid parent_gpid = request.parent_config.pid; dsn::gpid child_gpid = request.child_config.pid; - config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()]; if (ec == ERR_TIMEOUT || @@ -293,41 +286,31 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec, }, 0, std::chrono::seconds(delay)); - } else if (ec == ERR_OK) { - ddebug_f("gpid({}.{}) resgiter child gpid({}.{}) on remote storage succeed", - parent_gpid.get_app_id(), - parent_gpid.get_partition_index(), - child_gpid.get_app_id(), - child_gpid.get_partition_index()); - - std::shared_ptr update_child_request( - new configuration_update_request); - update_child_request->config = request.child_config; - update_child_request->info = *app; - update_child_request->type = config_type::CT_REGISTER_CHILD; - update_child_request->node = request.primary_address; - - partition_configuration child_config = app->partitions[child_gpid.get_partition_index()]; - child_config.secondaries = request.child_config.secondaries; - - // update local child partition configuration - _state->update_configuration_locally(*app, update_child_request); - - parent_context.pending_sync_task = nullptr; - parent_context.stage = config_status::not_pending; - - if (parent_context.msg) { - response.err = ERR_OK; - response.app = *app; - response.parent_config = app->partitions[parent_gpid.get_partition_index()]; - response.child_config = app->partitions[child_gpid.get_partition_index()]; - parent_context.msg = nullptr; - // TODO(heyuchen): pause add - // app->helpers->split_states.status.erase(parent_gpid.get_partition_index()); - // app->helpers->split_states.splitting_count--; - } - } else { - dassert(false, "we can't handle this right now, err = %s", ec.to_string()); + return; + } + dassert_f(ec == ERR_OK, "we can't handle this right now, err = {}", ec.to_string()); + ddebug_f("parent({}) resgiter child({}) on remote storage succeed", parent_gpid, child_gpid); + + // update local child partition configuration + std::shared_ptr update_child_request = + std::make_shared(); + update_child_request->config = request.child_config; + update_child_request->info = *app; + update_child_request->type = config_type::CT_REGISTER_CHILD; + update_child_request->node = request.primary_address; + + partition_configuration child_config = app->partitions[child_gpid.get_partition_index()]; + child_config.secondaries = request.child_config.secondaries; + _state->update_configuration_locally(*app, update_child_request); + + parent_context.pending_sync_task = nullptr; + parent_context.stage = config_status::not_pending; + if (parent_context.msg) { + response.err = ERR_OK; + response.app = *app; + response.parent_config = app->partitions[parent_gpid.get_partition_index()]; + response.child_config = app->partitions[child_gpid.get_partition_index()]; + parent_context.msg = nullptr; } } diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 15b369112a..c54024f3e9 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -936,8 +936,7 @@ struct update_group_partition_count_response 1:dsn.error_code err; } -// primary -> meta server -// update child partition configuration on meta when finish partition split +// primary parent -> meta server, register child on meta_server struct register_child_request { 1:dsn.layer2.app_info app; diff --git a/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp index 6e3f211795..37795a7443 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp @@ -439,9 +439,64 @@ class meta_split_service_test : public meta_test_base return rpc.response(); } + register_child_response + register_child(ballot req_parent_ballot, ballot child_ballot, bool wait_zk = false) + { + // mock local app info + auto app = find_app(NAME); + app->partition_count *= 2; + app->partitions.resize(app->partition_count); + app->helpers->contexts.resize(app->partition_count); + for (int i = 0; i < app->partition_count; ++i) { + app->helpers->contexts[i].config_owner = &app->partitions[i]; + app->partitions[i].pid = dsn::gpid(app->app_id, i); + if (i >= app->partition_count / 2) { + app->partitions[i].ballot = invalid_ballot; + } else { + app->partitions[i].ballot = PARENT_BALLOT; + } + } + app->partitions[CHILD_INDEX].ballot = child_ballot; + + // mock node state + node_state node; + node.put_partition(dsn::gpid(app->app_id, PARENT_INDEX), true); + mock_node_state(dsn::rpc_address("127.0.0.1", 10086), node); + + // mock register_child_request + partition_configuration parent_config; + parent_config.ballot = req_parent_ballot; + parent_config.last_committed_decree = 5; + parent_config.max_replica_count = 3; + parent_config.pid = dsn::gpid(app->app_id, PARENT_INDEX); + + dsn::partition_configuration child_config; + child_config.ballot = PARENT_BALLOT + 1; + child_config.last_committed_decree = 5; + child_config.pid = dsn::gpid(app->app_id, CHILD_INDEX); + + // register_child_request request; + auto request = dsn::make_unique(); + request->app.app_id = app->app_id; + request->parent_config = parent_config; + request->child_config = child_config; + request->primary_address = dsn::rpc_address("127.0.0.1", 10086); + + register_child_rpc rpc(std::move(request), RPC_CM_REGISTER_CHILD_REPLICA); + split_svc().register_child_on_meta(rpc); + wait_all(); + if (wait_zk) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return rpc.response(); + } + const std::string NAME = "split_table"; const uint32_t PARTITION_COUNT = 4; const uint32_t NEW_PARTITION_COUNT = 8; + const uint32_t PARENT_BALLOT = 3; + const uint32_t PARENT_INDEX = 0; + const uint32_t CHILD_INDEX = 4; }; TEST_F(meta_split_service_test, start_split_with_not_existed_app) @@ -464,5 +519,24 @@ TEST_F(meta_split_service_test, start_split_succeed) ASSERT_EQ(resp.partition_count, NEW_PARTITION_COUNT); } +// TODO(heyuchen): fix regisiter unit tests error +//TEST_F(meta_split_service_test, register_child_with_wrong_ballot) +//{ +// auto resp = register_child(PARENT_BALLOT - 1, invalid_ballot); +// ASSERT_EQ(resp.err, ERR_INVALID_VERSION); +//} + +//TEST_F(meta_split_service_test, register_child_with_child_registered) +//{ +// auto resp = register_child(PARENT_BALLOT, PARENT_BALLOT + 1); +// ASSERT_EQ(resp.err, ERR_CHILD_REGISTERED); +//} + +//TEST_F(meta_split_service_test, register_child_succeed) +//{ +// auto resp = register_child(PARENT_BALLOT, invalid_ballot, true); +// ASSERT_EQ(resp.err, ERR_OK); +//} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/test/meta_test/unit_test/meta_test_base.h b/src/dist/replication/test/meta_test/unit_test/meta_test_base.h index 0fc9ccecc2..69a573322a 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_test_base.h +++ b/src/dist/replication/test/meta_test/unit_test/meta_test_base.h @@ -119,6 +119,11 @@ class meta_test_base : public testing::Test return rpc.response(); } + void mock_node_state(const rpc_address &addr, const node_state &node) + { + _ss->_nodes[addr] = node; + } + std::shared_ptr find_app(const std::string &name) { return _ss->get_app(name); } meta_duplication_service &dup_svc() { return *(_ms->_dup_svc); } diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 68273e92b5..b75f79627d 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -564,6 +564,7 @@ class replica_split_test : public testing::Test void SetUp() { _stub = make_unique(); + _stub->set_state_connected(); mock_app_info(); _parent = _stub->generate_replica( _app_info, _parent_pid, partition_status::PS_PRIMARY, _init_ballot); @@ -598,6 +599,22 @@ class replica_split_test : public testing::Test _catch_up_req.child_address = dsn::rpc_address("127.0.0.1", 1); } + void mock_register_child_request() + { + partition_configuration &p_config = _register_req.parent_config; + p_config.pid = _parent_pid; + p_config.ballot = _init_ballot; + p_config.last_committed_decree = _decree; + + partition_configuration &c_config = _register_req.child_config; + c_config.pid = _child_pid; + c_config.ballot = _init_ballot + 1; + c_config.last_committed_decree = 0; + + _register_req.app = _app_info; + _register_req.primary_address = dsn::rpc_address("127.0.0.1", 10086); + } + void generate_child(partition_status::type status) { _child = _stub->generate_replica(_app_info, _child_pid, status, _init_ballot); @@ -705,6 +722,15 @@ class replica_split_test : public testing::Test partition_split_context get_split_context() { return _child->_split_states; } + primary_context get_replica_primary_context(mock_replica_ptr rep) + { + return rep->_primary_states; + } + + bool is_parent_not_in_split() { return (_parent->_child_gpid.get_app_id() == 0); } + + int32_t get_partition_version(mock_replica_ptr rep) { return rep->_partition_version.load(); } + void test_on_add_child() { _parent->on_add_child(_group_check_req); @@ -763,6 +789,28 @@ class replica_split_test : public testing::Test return resp.err; } + void test_register_child_on_meta() + { + _parent->register_child_on_meta(_init_ballot); + _parent->tracker()->wait_outstanding_tasks(); + } + + void test_on_register_child_rely(partition_status::type status, dsn::error_code resp_err) + { + mock_register_child_request(); + _parent->_config.status = status; + + register_child_response resp; + resp.err = resp_err; + resp.app = _register_req.app; + resp.app.partition_count *= 2; + resp.parent_config = _register_req.parent_config; + resp.child_config = _register_req.child_config; + + _parent->on_register_child_on_meta_reply(ERR_OK, _register_req, resp); + _parent->tracker()->wait_outstanding_tasks(); + } + public: std::unique_ptr _stub; @@ -778,6 +826,7 @@ class replica_split_test : public testing::Test group_check_request _group_check_req; notify_catch_up_request _catch_up_req; + register_child_request _register_req; std::vector _private_log_files; std::vector _mutation_list; const uint32_t _max_count = 10; @@ -988,5 +1037,51 @@ TEST_F(replica_split_test, handle_catch_up_with_all_caught_up) ASSERT_TRUE(get_sync_send_write_request()); } +TEST_F(replica_split_test, register_child_test) +{ + fail::setup(); + fail::cfg("replica_parent_send_register_request", "return()"); + test_register_child_on_meta(); + fail::teardown(); + + ASSERT_EQ(_parent->status(), partition_status::PS_INACTIVE); + ASSERT_EQ(get_partition_version(_parent), -1); +} + +TEST_F(replica_split_test, register_child_reply_with_wrong_status) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, true); + + test_on_register_child_rely(partition_status::PS_PRIMARY, ERR_OK); + primary_context parent_primary_states = get_replica_primary_context(_parent); + ASSERT_EQ(parent_primary_states.register_child_task, nullptr); +} + +TEST_F(replica_split_test, register_child_reply_with_child_registered) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, true); + + test_on_register_child_rely(partition_status::PS_INACTIVE, ERR_CHILD_REGISTERED); + + primary_context parent_primary_states = get_replica_primary_context(_parent); + ASSERT_EQ(parent_primary_states.register_child_task, nullptr); + ASSERT_TRUE(is_parent_not_in_split()); +} + +TEST_F(replica_split_test, register_child_reply_succeed) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, true); + + fail::setup(); + fail::cfg("replica_stub_split_replica_exec", "return()"); + test_on_register_child_rely(partition_status::PS_INACTIVE, ERR_OK); + fail::teardown(); + + ASSERT_TRUE(is_parent_not_in_split()); +} + } // namespace replication } // namespace dsn