diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 2a5aa166af..ed22d6896b 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -102,6 +102,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_DUPLICATION_SYNC, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_UPDATE_APP_ENV, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_DDD_DIAGNOSE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_CM_APP_PARTITION_SPLIT, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_CM_REGISTER_CHILD_REPLICA, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL #define CURRENT_THREAD_POOL THREAD_POOL_META_STATE diff --git a/include/dsn/dist/replication/replication_enums.h b/include/dsn/dist/replication/replication_enums.h index 6f67e78f28..7a83090ff1 100644 --- a/include/dsn/dist/replication/replication_enums.h +++ b/include/dsn/dist/replication/replication_enums.h @@ -58,6 +58,7 @@ ENUM_REG(replication::config_type::CT_REMOVE) ENUM_REG(replication::config_type::CT_ADD_SECONDARY_FOR_LB) ENUM_REG(replication::config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT) ENUM_REG(replication::config_type::CT_DROP_PARTITION) +ENUM_REG(replication::config_type::CT_REGISTER_CHILD) ENUM_END2(replication::config_type::type, config_type) ENUM_BEGIN2(replication::node_status::type, node_status, replication::node_status::NS_INVALID) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 940debd47f..a853c18629 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -91,7 +91,8 @@ struct config_type CT_REMOVE = 7, CT_ADD_SECONDARY_FOR_LB = 8, CT_PRIMARY_FORCE_UPDATE_BALLOT = 9, - CT_DROP_PARTITION = 10 + CT_DROP_PARTITION = 10, + CT_REGISTER_CHILD = 11 }; }; @@ -352,6 +353,10 @@ class notify_catch_up_request; class notify_cacth_up_response; +class register_child_request; + +class register_child_response; + typedef struct _mutation_header__isset { _mutation_header__isset() @@ -6043,6 +6048,140 @@ inline std::ostream &operator<<(std::ostream &out, const notify_cacth_up_respons obj.printTo(out); return out; } + +typedef struct _register_child_request__isset +{ + _register_child_request__isset() + : app(false), parent_config(false), child_config(false), primary_address(false) + { + } + bool app : 1; + bool parent_config : 1; + bool child_config : 1; + bool primary_address : 1; +} _register_child_request__isset; + +class register_child_request +{ +public: + register_child_request(const register_child_request &); + register_child_request(register_child_request &&); + register_child_request &operator=(const register_child_request &); + register_child_request &operator=(register_child_request &&); + register_child_request() {} + + virtual ~register_child_request() throw(); + ::dsn::app_info app; + ::dsn::partition_configuration parent_config; + ::dsn::partition_configuration child_config; + ::dsn::rpc_address primary_address; + + _register_child_request__isset __isset; + + void __set_app(const ::dsn::app_info &val); + + void __set_parent_config(const ::dsn::partition_configuration &val); + + void __set_child_config(const ::dsn::partition_configuration &val); + + void __set_primary_address(const ::dsn::rpc_address &val); + + bool operator==(const register_child_request &rhs) const + { + if (!(app == rhs.app)) + return false; + if (!(parent_config == rhs.parent_config)) + return false; + if (!(child_config == rhs.child_config)) + return false; + if (!(primary_address == rhs.primary_address)) + return false; + return true; + } + bool operator!=(const register_child_request &rhs) const { return !(*this == rhs); } + + bool operator<(const register_child_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(register_child_request &a, register_child_request &b); + +inline std::ostream &operator<<(std::ostream &out, const register_child_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _register_child_response__isset +{ + _register_child_response__isset() + : err(false), app(false), parent_config(false), child_config(false) + { + } + bool err : 1; + bool app : 1; + bool parent_config : 1; + bool child_config : 1; +} _register_child_response__isset; + +class register_child_response +{ +public: + register_child_response(const register_child_response &); + register_child_response(register_child_response &&); + register_child_response &operator=(const register_child_response &); + register_child_response &operator=(register_child_response &&); + register_child_response() {} + + virtual ~register_child_response() throw(); + ::dsn::error_code err; + ::dsn::app_info app; + ::dsn::partition_configuration parent_config; + ::dsn::partition_configuration child_config; + + _register_child_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + void __set_app(const ::dsn::app_info &val); + + void __set_parent_config(const ::dsn::partition_configuration &val); + + void __set_child_config(const ::dsn::partition_configuration &val); + + bool operator==(const register_child_response &rhs) const + { + if (!(err == rhs.err)) + return false; + if (!(app == rhs.app)) + return false; + if (!(parent_config == rhs.parent_config)) + return false; + if (!(child_config == rhs.child_config)) + return false; + return true; + } + bool operator!=(const register_child_response &rhs) const { return !(*this == rhs); } + + bool operator<(const register_child_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(register_child_response &a, register_child_response &b); + +inline std::ostream &operator<<(std::ostream &out, const register_child_response &obj) +{ + obj.printTo(out); + return out; +} } } // namespace diff --git a/include/dsn/utility/error_code.h b/include/dsn/utility/error_code.h index c26fdfd9bd..1c14813ad4 100644 --- a/include/dsn/utility/error_code.h +++ b/include/dsn/utility/error_code.h @@ -118,5 +118,6 @@ DEFINE_ERR_CODE(ERR_IGNORE_BAD_DATA) DEFINE_ERR_CODE(ERR_APP_DROPPED) DEFINE_ERR_CODE(ERR_MOCK_INTERNAL) DEFINE_ERR_CODE(ERR_ZOOKEEPER_OPERATION) +DEFINE_ERR_CODE(ERR_CHILD_REGISTERED) } // namespace dsn diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 4b379be83d..162301af5f 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -123,6 +123,8 @@ class replication_options void sanity_check(); }; +typedef rpc_holder register_child_rpc; + extern const char *partition_status_to_string(partition_status::type status); class cold_backup_constant diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index 2959618858..3c9dc7248b 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -75,7 +75,8 @@ int _kconfig_typeValues[] = {config_type::CT_INVALID, config_type::CT_REMOVE, config_type::CT_ADD_SECONDARY_FOR_LB, config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT, - config_type::CT_DROP_PARTITION}; + config_type::CT_DROP_PARTITION, + config_type::CT_REGISTER_CHILD}; const char *_kconfig_typeNames[] = {"CT_INVALID", "CT_ASSIGN_PRIMARY", "CT_UPGRADE_TO_PRIMARY", @@ -86,9 +87,10 @@ const char *_kconfig_typeNames[] = {"CT_INVALID", "CT_REMOVE", "CT_ADD_SECONDARY_FOR_LB", "CT_PRIMARY_FORCE_UPDATE_BALLOT", - "CT_DROP_PARTITION"}; + "CT_DROP_PARTITION", + "CT_REGISTER_CHILD"}; const std::map _config_type_VALUES_TO_NAMES( - ::apache::thrift::TEnumIterator(11, _kconfig_typeValues, _kconfig_typeNames), + ::apache::thrift::TEnumIterator(12, _kconfig_typeValues, _kconfig_typeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _knode_statusValues[] = { @@ -14253,5 +14255,336 @@ void notify_cacth_up_response::printTo(std::ostream &out) const out << "err=" << to_string(err); out << ")"; } + +register_child_request::~register_child_request() throw() {} + +void register_child_request::__set_app(const ::dsn::app_info &val) { this->app = val; } + +void register_child_request::__set_parent_config(const ::dsn::partition_configuration &val) +{ + this->parent_config = val; +} + +void register_child_request::__set_child_config(const ::dsn::partition_configuration &val) +{ + this->child_config = val; +} + +void register_child_request::__set_primary_address(const ::dsn::rpc_address &val) +{ + this->primary_address = val; +} + +uint32_t register_child_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->app.read(iprot); + this->__isset.app = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->parent_config.read(iprot); + this->__isset.parent_config = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->child_config.read(iprot); + this->__isset.child_config = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->primary_address.read(iprot); + this->__isset.primary_address = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t register_child_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("register_child_request"); + + xfer += oprot->writeFieldBegin("app", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->app.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("parent_config", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->parent_config.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("child_config", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->child_config.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("primary_address", ::apache::thrift::protocol::T_STRUCT, 4); + xfer += this->primary_address.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(register_child_request &a, register_child_request &b) +{ + using ::std::swap; + swap(a.app, b.app); + swap(a.parent_config, b.parent_config); + swap(a.child_config, b.child_config); + swap(a.primary_address, b.primary_address); + swap(a.__isset, b.__isset); +} + +register_child_request::register_child_request(const register_child_request &other628) +{ + app = other628.app; + parent_config = other628.parent_config; + child_config = other628.child_config; + primary_address = other628.primary_address; + __isset = other628.__isset; +} +register_child_request::register_child_request(register_child_request &&other629) +{ + app = std::move(other629.app); + parent_config = std::move(other629.parent_config); + child_config = std::move(other629.child_config); + primary_address = std::move(other629.primary_address); + __isset = std::move(other629.__isset); +} +register_child_request ®ister_child_request::operator=(const register_child_request &other630) +{ + app = other630.app; + parent_config = other630.parent_config; + child_config = other630.child_config; + primary_address = other630.primary_address; + __isset = other630.__isset; + return *this; +} +register_child_request ®ister_child_request::operator=(register_child_request &&other631) +{ + app = std::move(other631.app); + parent_config = std::move(other631.parent_config); + child_config = std::move(other631.child_config); + primary_address = std::move(other631.primary_address); + __isset = std::move(other631.__isset); + return *this; +} +void register_child_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "register_child_request("; + out << "app=" << to_string(app); + out << ", " + << "parent_config=" << to_string(parent_config); + out << ", " + << "child_config=" << to_string(child_config); + out << ", " + << "primary_address=" << to_string(primary_address); + out << ")"; +} + +register_child_response::~register_child_response() throw() {} + +void register_child_response::__set_err(const ::dsn::error_code &val) { this->err = val; } + +void register_child_response::__set_app(const ::dsn::app_info &val) { this->app = val; } + +void register_child_response::__set_parent_config(const ::dsn::partition_configuration &val) +{ + this->parent_config = val; +} + +void register_child_response::__set_child_config(const ::dsn::partition_configuration &val) +{ + this->child_config = val; +} + +uint32_t register_child_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->app.read(iprot); + this->__isset.app = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->parent_config.read(iprot); + this->__isset.parent_config = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->child_config.read(iprot); + this->__isset.child_config = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t register_child_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("register_child_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("app", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->app.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("parent_config", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->parent_config.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("child_config", ::apache::thrift::protocol::T_STRUCT, 4); + xfer += this->child_config.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(register_child_response &a, register_child_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.app, b.app); + swap(a.parent_config, b.parent_config); + swap(a.child_config, b.child_config); + swap(a.__isset, b.__isset); +} + +register_child_response::register_child_response(const register_child_response &other632) +{ + err = other632.err; + app = other632.app; + parent_config = other632.parent_config; + child_config = other632.child_config; + __isset = other632.__isset; +} +register_child_response::register_child_response(register_child_response &&other633) +{ + err = std::move(other633.err); + app = std::move(other633.app); + parent_config = std::move(other633.parent_config); + child_config = std::move(other633.child_config); + __isset = std::move(other633.__isset); +} +register_child_response ®ister_child_response::operator=(const register_child_response &other634) +{ + err = other634.err; + app = other634.app; + parent_config = other634.parent_config; + child_config = other634.child_config; + __isset = other634.__isset; + return *this; +} +register_child_response ®ister_child_response::operator=(register_child_response &&other635) +{ + err = std::move(other635.err); + app = std::move(other635.app); + parent_config = std::move(other635.parent_config); + child_config = std::move(other635.child_config); + __isset = std::move(other635.__isset); + return *this; +} +void register_child_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "register_child_response("; + out << "err=" << to_string(err); + out << ", " + << "app=" << to_string(app); + out << ", " + << "parent_config=" << to_string(parent_config); + out << ", " + << "child_config=" << to_string(child_config); + out << ")"; +} } } // namespace diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 266b001d44..25c52d4976 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -401,6 +401,17 @@ class replica : public serverlet, public ref_counter, public replica_ba // sync_point is the first decree after parent send write request to child synchronously void parent_check_sync_point_commit(decree sync_point); + // primary parent register children on meta_server + void register_child_on_meta(ballot b); + void on_register_child_on_meta_reply(dsn::error_code ec, + const register_child_request &request, + const register_child_response &response); + // primary sends register request to meta_server + void parent_send_register_request(const register_child_request &request); + + // child partition has been registered on meta_server, could be active + void child_partition_active(const partition_configuration &config); + // return true if parent status is valid bool parent_check_states(); diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 607eccd5d9..94350d878a 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -40,6 +40,7 @@ #include "replica_stub.h" #include #include +#include #include #include @@ -361,6 +362,13 @@ void replica::update_configuration_on_meta_server(config_type::type type, ::dsn::rpc_address node, partition_configuration &newConfig) { + // type should never be `CT_REGISTER_CHILD` + // if this happens, it means serious mistake happened during partition split + // assert here to stop split and avoid splitting wrong + if (type == config_type::CT_REGISTER_CHILD) { + dassert_replica(false, "invalid config_type, type = {}", enum_to_string(type)); + } + newConfig.last_committed_decree = last_committed_decree(); if (type == config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT) { @@ -702,6 +710,16 @@ bool replica::update_local_configuration(const replica_configuration &config, } } break; + case partition_status::PS_PARTITION_SPLIT: + if (config.status == partition_status::PS_INACTIVE) { + dwarn_replica("status change from {} @ {} to {} @ {} is not allowed", + enum_to_string(old_status), + old_ballot, + enum_to_string(config.status), + config.ballot); + return false; + } + break; default: break; } @@ -842,6 +860,28 @@ bool replica::update_local_configuration(const replica_configuration &config, dassert(false, "invalid execution path"); } break; + case partition_status::PS_PARTITION_SPLIT: + switch (config.status) { + case partition_status::PS_PRIMARY: + _split_states.cleanup(true); + init_group_check(); + replay_prepare_list(); + break; + case partition_status::PS_SECONDARY: + _split_states.cleanup(true); + break; + case partition_status::PS_POTENTIAL_SECONDARY: + dassert(false, "invalid execution path"); + break; + case partition_status::PS_INACTIVE: + break; + case partition_status::PS_ERROR: + _split_states.cleanup(false); + break; + default: + dassert(false, "invalid execution path"); + } + break; case partition_status::PS_INACTIVE: if (config.status != partition_status::PS_PRIMARY || !_inactive_is_transient) { // except for case 1, we need stop uploading backup checkpoint diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index 79f27d5a9c..3c96e443ca 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -91,6 +91,9 @@ void primary_context::cleanup(bool clean_pending_mutations) // clean up checkpoint CLEANUP_TASK_ALWAYS(checkpoint_task) + // clean up register child task + CLEANUP_TASK_ALWAYS(register_child_task) + membership.ballot = 0; caught_up_children.clear(); @@ -101,7 +104,8 @@ void primary_context::cleanup(bool clean_pending_mutations) bool primary_context::is_cleaned() { return nullptr == group_check_task && nullptr == reconfiguration_task && - nullptr == checkpoint_task && group_check_pending_replies.empty(); + nullptr == checkpoint_task && group_check_pending_replies.empty() && + nullptr == register_child_task; } void primary_context::do_cleanup_pending_mutations(bool clean_pending_mutations) diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index efa8f766f6..86e527c467 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -118,6 +118,10 @@ class primary_context // if app_id of `_child_gpid` is greater than zero, it means replica is during partition split, // otherwise, not during partition split bool sync_send_write_request{false}; + + // Used for partition split + // primary parent register child on meta_server task + dsn::task_ptr register_child_task; }; class secondary_context diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp index 6824b85df3..fcc8e99e50 100644 --- a/src/dist/replication/lib/replica_split.cpp +++ b/src/dist/replication/lib/replica_split.cpp @@ -606,6 +606,154 @@ void replica::parent_check_sync_point_commit(decree sync_point) // on primary pa } } +// ThreadPool: THREAD_POOL_REPLICATION +void replica::register_child_on_meta(ballot b) // on primary parent +{ + if (status() != partition_status::PS_PRIMARY) { + dwarn_replica("failed to register child, status = {}", enum_to_string(status())); + return; + } + + if (_primary_states.reconfiguration_task != nullptr) { + dwarn_replica("under reconfiguration, delay and retry to register child"); + _primary_states.register_child_task = + tasking::enqueue(LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica::register_child_on_meta, this, b), + get_gpid().thread_hash(), + std::chrono::seconds(1)); + return; + } + + partition_configuration child_config = _primary_states.membership; + child_config.ballot++; + child_config.last_committed_decree = 0; + child_config.last_drops.clear(); + child_config.pid.set_partition_index(_app_info.partition_count + + get_gpid().get_partition_index()); + + register_child_request request; + request.app = _app_info; + request.child_config = child_config; + request.parent_config = _primary_states.membership; + request.primary_address = _stub->_primary_address; + + // reject client request + update_local_configuration_with_no_ballot_change(partition_status::PS_INACTIVE); + set_inactive_state_transient(true); + _partition_version = -1; + + parent_send_register_request(request); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::parent_send_register_request( + const register_child_request &request) // on primary parent +{ + FAIL_POINT_INJECT_F("replica_parent_send_register_request", [](dsn::string_view) {}); + + dcheck_eq_replica(status(), partition_status::PS_INACTIVE); + ddebug_replica( + "send register child({}) request to meta_server, current ballot = {}, child ballot = {}", + request.child_config.pid, + request.parent_config.ballot, + request.child_config.ballot); + + rpc_address meta_address(_stub->_failure_detector->get_servers()); + std::unique_ptr req = make_unique(request); + register_child_rpc rpc(std::move(req), RPC_CM_REGISTER_CHILD_REPLICA); + _primary_states.register_child_task = + rpc.call(meta_address, + tracker(), + [this, rpc](error_code ec) mutable { + on_register_child_on_meta_reply(ec, rpc.request(), rpc.response()); + }, + _split_states.parent_gpid.thread_hash()); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::on_register_child_on_meta_reply( + dsn::error_code ec, + const register_child_request &request, + const register_child_response &response) // on primary parent +{ + FAIL_POINT_INJECT_F("replica_on_register_child_on_meta_reply", [](dsn::string_view) {}); + + _checker.only_one_thread_access(); + + // primary parent is under reconfiguration, whose status should be PS_INACTIVE + if (partition_status::PS_INACTIVE != status() || !_stub->is_connected()) { + dwarn_replica("status wrong or stub is not connected, status = {}", + enum_to_string(status())); + _primary_states.register_child_task = nullptr; + // TODO(heyuchen): TBD - clear other split tasks in primary context + return; + } + + dsn::error_code err = ec == ERR_OK ? response.err : ec; + if (err != ERR_OK) { + dwarn_replica( + "register child({}) failed, error = {}, request child ballot = {}, local ballot = {}", + request.child_config.pid, + err.to_string(), + request.child_config.ballot, + get_ballot()); + + // register request is out-of-dated + if (err == ERR_INVALID_VERSION) { + return; + } + + // we need not resend register request if child has been registered + if (err != ERR_CHILD_REGISTERED) { + _primary_states.register_child_task = + tasking::enqueue(LPC_DELAY_UPDATE_CONFIG, + tracker(), + std::bind(&replica::parent_send_register_request, this, request), + get_gpid().thread_hash(), + std::chrono::seconds(1)); + return; + } + } + + if (err == ERR_OK) { + ddebug_replica("register child({}) succeed, response parent ballot = {}, local ballot = " + "{}, local status = {}", + response.child_config.pid, + response.parent_config.ballot, + get_ballot(), + enum_to_string(status())); + + dcheck_eq_replica(_app_info.partition_count * 2, response.app.partition_count); + _stub->split_replica_exec(LPC_PARTITION_SPLIT, + response.child_config.pid, + std::bind(&replica::child_partition_active, + std::placeholders::_1, + response.child_config)); + + // TODO(heyuchen): TBD - update parent group partition_count + } + + // parent register child succeed or child partition has already resgitered + // in both situation, we should reset resgiter child task and child_gpid + _primary_states.register_child_task = nullptr; + _child_gpid.set_app_id(0); + if (response.parent_config.ballot >= get_ballot()) { + ddebug_replica("response ballot = {}, local ballot = {}, should update configuration", + response.parent_config.ballot, + get_ballot()); + update_configuration(response.parent_config); + } +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::child_partition_active(const partition_configuration &config) // on child +{ + ddebug_replica("child partition become active"); + _primary_states.last_prepare_decree_on_new_primary = _prepare_list->max_decree(); + update_configuration(config); +} + // ThreadPool: THREAD_POOL_REPLICATION void replica::parent_cleanup_split_context() // on parent partition { diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index a8c29b95a7..684c69bb91 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -373,6 +373,9 @@ void meta_service::register_rpc_handlers() RPC_CM_DDD_DIAGNOSE, "ddd_diagnose", &meta_service::ddd_diagnose); register_rpc_handler_with_rpc_holder( RPC_CM_APP_PARTITION_SPLIT, "app_partition_split", &meta_service::on_app_partition_split); + register_rpc_handler_with_rpc_holder(RPC_CM_REGISTER_CHILD_REPLICA, + "register_child_on_meta", + &meta_service::on_register_child_on_meta); } int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address) @@ -923,5 +926,15 @@ void meta_service::on_app_partition_split(app_partition_split_rpc rpc) server_state::sStateHash); } +void meta_service::on_register_child_on_meta(register_child_rpc rpc) +{ + RPC_CHECK_STATUS(rpc.dsn_request(), rpc.response()); + + tasking::enqueue(LPC_META_STATE_NORMAL, + tracker(), + [this, rpc]() { _split_svc->register_child_on_meta(std::move(rpc)); }, + server_state::sStateHash); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/meta_server/meta_service.h b/src/dist/replication/meta_server/meta_service.h index 4a1fdb65a5..bc6b3b073e 100644 --- a/src/dist/replication/meta_server/meta_service.h +++ b/src/dist/replication/meta_server/meta_service.h @@ -183,6 +183,7 @@ class meta_service : public serverlet // split void on_app_partition_split(app_partition_split_rpc rpc); + void on_register_child_on_meta(register_child_rpc rpc); // common routines // ret: diff --git a/src/dist/replication/meta_server/meta_split_service.cpp b/src/dist/replication/meta_server/meta_split_service.cpp index c3cbdab9de..c304514ee3 100644 --- a/src/dist/replication/meta_server/meta_split_service.cpp +++ b/src/dist/replication/meta_server/meta_split_service.cpp @@ -131,5 +131,146 @@ void meta_split_service::do_app_partition_split(std::shared_ptr app, _meta_svc->get_meta_storage()->set_data( _state->get_app_path(*app), std::move(value), on_write_storage_complete); } + +void meta_split_service::register_child_on_meta(register_child_rpc rpc) +{ + const auto &request = rpc.request(); + auto &response = rpc.response(); + response.err = ERR_IO_PENDING; + + zauto_write_lock(app_lock()); + std::shared_ptr app = _state->get_app(request.app.app_id); + dassert_f(app != nullptr, "app is not existed, id({})", request.app.app_id); + dassert_f(app->is_stateful, "app is stateless currently, id({})", request.app.app_id); + + dsn::gpid parent_gpid = request.parent_config.pid; + dsn::gpid child_gpid = request.child_config.pid; + const partition_configuration &parent_config = + app->partitions[parent_gpid.get_partition_index()]; + const partition_configuration &child_config = app->partitions[child_gpid.get_partition_index()]; + config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()]; + + if (request.parent_config.ballot < parent_config.ballot) { + dwarn_f("partition({}) register child failed, request is out-dated, request ballot = {}, " + "meta ballot = {}", + parent_gpid, + request.parent_config.ballot, + parent_config.ballot); + response.err = ERR_INVALID_VERSION; + return; + } + + if (child_config.ballot != invalid_ballot) { + dwarn_f( + "duplicated register child request, child({}) has already been registered, ballot = {}", + child_gpid, + child_config.ballot); + response.err = ERR_CHILD_REGISTERED; + return; + } + + if (parent_context.stage == config_status::pending_proposal || + parent_context.stage == config_status::pending_remote_sync) { + dwarn_f("another request is syncing with remote storage, ignore this request"); + return; + } + + ddebug_f("parent({}) will register child({})", parent_gpid, child_gpid); + parent_context.stage = config_status::pending_remote_sync; + parent_context.msg = rpc.dsn_request(); + parent_context.pending_sync_task = add_child_on_remote_storage(rpc, true); +} + +dsn::task_ptr meta_split_service::add_child_on_remote_storage(register_child_rpc rpc, + bool create_new) +{ + const auto &request = rpc.request(); + const std::string &partition_path = _state->get_partition_path(request.child_config.pid); + blob value = dsn::json::json_forwarder::encode(request.child_config); + if (create_new) { + return _meta_svc->get_remote_storage()->create_node( + partition_path, + LPC_META_STATE_HIGH, + std::bind(&meta_split_service::on_add_child_on_remote_storage_reply, + this, + std::placeholders::_1, + rpc, + create_new), + value); + } else { + return _meta_svc->get_remote_storage()->set_data( + partition_path, + value, + LPC_META_STATE_HIGH, + std::bind(&meta_split_service::on_add_child_on_remote_storage_reply, + this, + std::placeholders::_1, + rpc, + create_new), + _meta_svc->tracker()); + } +} + +void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec, + register_child_rpc rpc, + bool create_new) +{ + zauto_write_lock(app_lock()); + + const auto &request = rpc.request(); + auto &response = rpc.response(); + + std::shared_ptr app = _state->get_app(request.app.app_id); + dassert_f(app != nullptr, "app is not existed, id({})", request.app.app_id); + dassert_f(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, + "app is not available now, id({})", + request.app.app_id); + + dsn::gpid parent_gpid = request.parent_config.pid; + dsn::gpid child_gpid = request.child_config.pid; + config_context &parent_context = app->helpers->contexts[parent_gpid.get_partition_index()]; + + if (ec == ERR_TIMEOUT || + (ec == ERR_NODE_ALREADY_EXIST && create_new)) { // retry register child on remote storage + bool retry_create_new = (ec == ERR_TIMEOUT) ? create_new : false; + int delay = (ec == ERR_TIMEOUT) ? 1 : 0; + parent_context.pending_sync_task = + tasking::enqueue(LPC_META_STATE_HIGH, + nullptr, + [this, parent_context, rpc, retry_create_new]() mutable { + parent_context.pending_sync_task = + add_child_on_remote_storage(rpc, retry_create_new); + }, + 0, + std::chrono::seconds(delay)); + return; + } + dassert_f(ec == ERR_OK, "we can't handle this right now, err = {}", ec.to_string()); + + ddebug_f("parent({}) resgiter child({}) on remote storage succeed", parent_gpid, child_gpid); + + // update local child partition configuration + std::shared_ptr update_child_request = + std::make_shared(); + update_child_request->config = request.child_config; + update_child_request->info = *app; + update_child_request->type = config_type::CT_REGISTER_CHILD; + update_child_request->node = request.primary_address; + + partition_configuration child_config = app->partitions[child_gpid.get_partition_index()]; + child_config.secondaries = request.child_config.secondaries; + _state->update_configuration_locally(*app, update_child_request); + + parent_context.pending_sync_task = nullptr; + parent_context.stage = config_status::not_pending; + if (parent_context.msg) { + response.err = ERR_OK; + response.app = *app; + response.parent_config = app->partitions[parent_gpid.get_partition_index()]; + response.child_config = app->partitions[child_gpid.get_partition_index()]; + parent_context.msg = nullptr; + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/meta_server/meta_split_service.h b/src/dist/replication/meta_server/meta_split_service.h index 46e20d106e..f9789e99df 100644 --- a/src/dist/replication/meta_server/meta_split_service.h +++ b/src/dist/replication/meta_server/meta_split_service.h @@ -36,9 +36,17 @@ class meta_split_service // client -> meta to start split void app_partition_split(app_partition_split_rpc rpc); + // primary parent -> meta_server to register child + void register_child_on_meta(register_child_rpc rpc); + private: void do_app_partition_split(std::shared_ptr app, app_partition_split_rpc rpc); + // meta -> remote storage to update child replica config + dsn::task_ptr add_child_on_remote_storage(register_child_rpc rpc, bool create_new); + void + on_add_child_on_remote_storage_reply(error_code ec, register_child_rpc rpc, bool create_new); + private: meta_service *_meta_svc; server_state *_state; diff --git a/src/dist/replication/meta_server/server_state.cpp b/src/dist/replication/meta_server/server_state.cpp index 1786464b4b..8afe25447f 100644 --- a/src/dist/replication/meta_server/server_state.cpp +++ b/src/dist/replication/meta_server/server_state.cpp @@ -1388,7 +1388,7 @@ void server_state::update_configuration_locally( health_status new_health_status = partition_health_status(new_cfg, min_2pc_count); if (app.is_stateful) { - dassert(old_cfg.ballot + 1 == new_cfg.ballot, + dassert(old_cfg.ballot == invalid_ballot || old_cfg.ballot + 1 == new_cfg.ballot, "invalid configuration update request, old ballot %" PRId64 ", new ballot %" PRId64 "", old_cfg.ballot, @@ -1438,6 +1438,14 @@ void server_state::update_configuration_locally( case config_type::CT_ADD_SECONDARY_FOR_LB: dassert(false, "invalid execution work flow"); break; + case config_type::CT_REGISTER_CHILD: { + ns->put_partition(gpid, true); + for (auto &secondary : config_request->config.secondaries) { + auto secondary_node = get_node_state(_nodes, secondary, false); + secondary_node->put_partition(gpid, false); + } + break; + } default: dassert(false, ""); break; diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 85363ba277..d2fc2c50f4 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -195,7 +195,8 @@ enum config_type CT_REMOVE, CT_ADD_SECONDARY_FOR_LB, CT_PRIMARY_FORCE_UPDATE_BALLOT, - CT_DROP_PARTITION + CT_DROP_PARTITION, + CT_REGISTER_CHILD } enum node_status @@ -852,6 +853,27 @@ struct notify_cacth_up_response 1:dsn.error_code err; } +// primary parent -> meta server, register child on meta_server +struct register_child_request +{ + 1:dsn.layer2.app_info app; + 2:dsn.layer2.partition_configuration parent_config; + 3:dsn.layer2.partition_configuration child_config; + 4:dsn.rpc_address primary_address; +} + +struct register_child_response +{ + // Possible errors: + // - ERR_INVALID_VERSION: request is out-dated + // - ERR_CHILD_REGISTERED: child has been registered + // - ERR_IO_PENDING: meta is executing another remote sync task + 1:dsn.error_code err; + 2:dsn.layer2.app_info app; + 3:dsn.layer2.partition_configuration parent_config; + 4:dsn.layer2.partition_configuration child_config; +} + /* service replica_s { diff --git a/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp b/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp index 4f477b5f8f..9156755983 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp +++ b/src/dist/replication/test/meta_test/unit_test/meta_split_service_test.cpp @@ -56,9 +56,64 @@ class meta_split_service_test : public meta_test_base return rpc.response(); } + register_child_response + register_child(ballot req_parent_ballot, ballot child_ballot, bool wait_zk = false) + { + // mock local app info + auto app = find_app(NAME); + app->partition_count *= 2; + app->partitions.resize(app->partition_count); + app->helpers->contexts.resize(app->partition_count); + for (int i = 0; i < app->partition_count; ++i) { + app->helpers->contexts[i].config_owner = &app->partitions[i]; + app->partitions[i].pid = dsn::gpid(app->app_id, i); + if (i >= app->partition_count / 2) { + app->partitions[i].ballot = invalid_ballot; + } else { + app->partitions[i].ballot = PARENT_BALLOT; + } + } + app->partitions[CHILD_INDEX].ballot = child_ballot; + + // mock node state + node_state node; + node.put_partition(dsn::gpid(app->app_id, PARENT_INDEX), true); + mock_node_state(dsn::rpc_address("127.0.0.1", 10086), node); + + // mock register_child_request + partition_configuration parent_config; + parent_config.ballot = req_parent_ballot; + parent_config.last_committed_decree = 5; + parent_config.max_replica_count = 3; + parent_config.pid = dsn::gpid(app->app_id, PARENT_INDEX); + + dsn::partition_configuration child_config; + child_config.ballot = PARENT_BALLOT + 1; + child_config.last_committed_decree = 5; + child_config.pid = dsn::gpid(app->app_id, CHILD_INDEX); + + // register_child_request request; + auto request = dsn::make_unique(); + request->app.app_id = app->app_id; + request->parent_config = parent_config; + request->child_config = child_config; + request->primary_address = dsn::rpc_address("127.0.0.1", 10086); + + register_child_rpc rpc(std::move(request), RPC_CM_REGISTER_CHILD_REPLICA); + split_svc().register_child_on_meta(rpc); + wait_all(); + if (wait_zk) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return rpc.response(); + } + const std::string NAME = "split_table"; const uint32_t PARTITION_COUNT = 4; const uint32_t NEW_PARTITION_COUNT = 8; + const uint32_t PARENT_BALLOT = 3; + const uint32_t PARENT_INDEX = 0; + const uint32_t CHILD_INDEX = 4; }; TEST_F(meta_split_service_test, start_split_with_not_existed_app) @@ -81,5 +136,23 @@ TEST_F(meta_split_service_test, start_split_succeed) ASSERT_EQ(resp.partition_count, NEW_PARTITION_COUNT); } +TEST_F(meta_split_service_test, register_child_with_wrong_ballot) +{ + auto resp = register_child(PARENT_BALLOT - 1, invalid_ballot); + ASSERT_EQ(resp.err, ERR_INVALID_VERSION); +} + +TEST_F(meta_split_service_test, register_child_with_child_registered) +{ + auto resp = register_child(PARENT_BALLOT, PARENT_BALLOT + 1); + ASSERT_EQ(resp.err, ERR_CHILD_REGISTERED); +} + +TEST_F(meta_split_service_test, register_child_succeed) +{ + auto resp = register_child(PARENT_BALLOT, invalid_ballot, true); + ASSERT_EQ(resp.err, ERR_OK); +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/test/meta_test/unit_test/meta_test_base.h b/src/dist/replication/test/meta_test/unit_test/meta_test_base.h index 0fc9ccecc2..69a573322a 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_test_base.h +++ b/src/dist/replication/test/meta_test/unit_test/meta_test_base.h @@ -119,6 +119,11 @@ class meta_test_base : public testing::Test return rpc.response(); } + void mock_node_state(const rpc_address &addr, const node_state &node) + { + _ss->_nodes[addr] = node; + } + std::shared_ptr find_app(const std::string &name) { return _ss->get_app(name); } meta_duplication_service &dup_svc() { return *(_ms->_dup_svc); } diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index 7bb137a917..081c963834 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -16,6 +16,7 @@ class replica_split_test : public testing::Test void SetUp() { _stub = make_unique(); + _stub->set_state_connected(); mock_app_info(); _parent = _stub->generate_replica( _app_info, _parent_pid, partition_status::PS_PRIMARY, _init_ballot); @@ -50,6 +51,22 @@ class replica_split_test : public testing::Test _catch_up_req.child_address = dsn::rpc_address("127.0.0.1", 1); } + void mock_register_child_request() + { + partition_configuration &p_config = _register_req.parent_config; + p_config.pid = _parent_pid; + p_config.ballot = _init_ballot; + p_config.last_committed_decree = _decree; + + partition_configuration &c_config = _register_req.child_config; + c_config.pid = _child_pid; + c_config.ballot = _init_ballot + 1; + c_config.last_committed_decree = 0; + + _register_req.app = _app_info; + _register_req.primary_address = dsn::rpc_address("127.0.0.1", 10086); + } + void generate_child(partition_status::type status) { _child = _stub->generate_replica(_app_info, _child_pid, status, _init_ballot); @@ -157,6 +174,15 @@ class replica_split_test : public testing::Test partition_split_context get_split_context() { return _child->_split_states; } + primary_context get_replica_primary_context(mock_replica_ptr rep) + { + return rep->_primary_states; + } + + bool is_parent_not_in_split() { return (_parent->_child_gpid.get_app_id() == 0); } + + int32_t get_partition_version(mock_replica_ptr rep) { return rep->_partition_version.load(); } + void test_on_add_child() { _parent->on_add_child(_group_check_req); @@ -215,6 +241,28 @@ class replica_split_test : public testing::Test return resp.err; } + void test_register_child_on_meta() + { + _parent->register_child_on_meta(_init_ballot); + _parent->tracker()->wait_outstanding_tasks(); + } + + void test_on_register_child_rely(partition_status::type status, dsn::error_code resp_err) + { + mock_register_child_request(); + _parent->_config.status = status; + + register_child_response resp; + resp.err = resp_err; + resp.app = _register_req.app; + resp.app.partition_count *= 2; + resp.parent_config = _register_req.parent_config; + resp.child_config = _register_req.child_config; + + _parent->on_register_child_on_meta_reply(ERR_OK, _register_req, resp); + _parent->tracker()->wait_outstanding_tasks(); + } + public: std::unique_ptr _stub; @@ -230,6 +278,7 @@ class replica_split_test : public testing::Test group_check_request _group_check_req; notify_catch_up_request _catch_up_req; + register_child_request _register_req; std::vector _private_log_files; std::vector _mutation_list; const uint32_t _max_count = 10; @@ -440,5 +489,51 @@ TEST_F(replica_split_test, handle_catch_up_with_all_caught_up) ASSERT_TRUE(get_sync_send_write_request()); } +TEST_F(replica_split_test, register_child_test) +{ + fail::setup(); + fail::cfg("replica_parent_send_register_request", "return()"); + test_register_child_on_meta(); + fail::teardown(); + + ASSERT_EQ(_parent->status(), partition_status::PS_INACTIVE); + ASSERT_EQ(get_partition_version(_parent), -1); +} + +TEST_F(replica_split_test, register_child_reply_with_wrong_status) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, true); + + test_on_register_child_rely(partition_status::PS_PRIMARY, ERR_OK); + primary_context parent_primary_states = get_replica_primary_context(_parent); + ASSERT_EQ(parent_primary_states.register_child_task, nullptr); +} + +TEST_F(replica_split_test, register_child_reply_with_child_registered) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, true); + + test_on_register_child_rely(partition_status::PS_INACTIVE, ERR_CHILD_REGISTERED); + + primary_context parent_primary_states = get_replica_primary_context(_parent); + ASSERT_EQ(parent_primary_states.register_child_task, nullptr); + ASSERT_TRUE(is_parent_not_in_split()); +} + +TEST_F(replica_split_test, register_child_reply_succeed) +{ + generate_child(partition_status::PS_PARTITION_SPLIT); + mock_child_split_context(_parent_pid, true, true); + + fail::setup(); + fail::cfg("replica_stub_split_replica_exec", "return()"); + test_on_register_child_rely(partition_status::PS_INACTIVE, ERR_OK); + fail::teardown(); + + ASSERT_TRUE(is_parent_not_in_split()); +} + } // namespace replication } // namespace dsn