diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 86c791296c..f095368bc9 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -145,6 +145,7 @@ MAKE_EVENT_CODE(LPC_REPLICATION_COLD_BACKUP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_EXEC_COMMAND_ON_REPLICA, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ERROR, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_SPLIT_NOTIFY_CATCH_UP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LOW, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_REPLICATION_COMMON, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 1fac0cf674..5c67f9f4d8 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -328,6 +328,10 @@ class app_partition_split_request; class app_partition_split_response; +class notify_catch_up_request; + +class notify_cacth_up_response; + typedef struct _mutation_header__isset { _mutation_header__isset() @@ -5627,6 +5631,119 @@ inline std::ostream &operator<<(std::ostream &out, const app_partition_split_res obj.printTo(out); return out; } + +typedef struct _notify_catch_up_request__isset +{ + _notify_catch_up_request__isset() + : parent_gpid(false), child_gpid(false), child_ballot(false), child_address(false) + { + } + bool parent_gpid : 1; + bool child_gpid : 1; + bool child_ballot : 1; + bool child_address : 1; +} _notify_catch_up_request__isset; + +class notify_catch_up_request +{ +public: + notify_catch_up_request(const notify_catch_up_request &); + notify_catch_up_request(notify_catch_up_request &&); + notify_catch_up_request &operator=(const notify_catch_up_request &); + notify_catch_up_request &operator=(notify_catch_up_request &&); + notify_catch_up_request() : child_ballot(0) {} + + virtual ~notify_catch_up_request() throw(); + ::dsn::gpid parent_gpid; + ::dsn::gpid child_gpid; + int64_t child_ballot; + ::dsn::rpc_address child_address; + + _notify_catch_up_request__isset __isset; + + void __set_parent_gpid(const ::dsn::gpid &val); + + void __set_child_gpid(const ::dsn::gpid &val); + + void __set_child_ballot(const int64_t val); + + void __set_child_address(const ::dsn::rpc_address &val); + + bool operator==(const notify_catch_up_request &rhs) const + { + if (!(parent_gpid == rhs.parent_gpid)) + return false; + if (!(child_gpid == rhs.child_gpid)) + return false; + if (!(child_ballot == rhs.child_ballot)) + return false; + if (!(child_address == rhs.child_address)) + return false; + return true; + } + bool operator!=(const notify_catch_up_request &rhs) const { return !(*this == rhs); } + + bool operator<(const notify_catch_up_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(notify_catch_up_request &a, notify_catch_up_request &b); + +inline std::ostream &operator<<(std::ostream &out, const notify_catch_up_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _notify_cacth_up_response__isset +{ + _notify_cacth_up_response__isset() : err(false) {} + bool err : 1; +} _notify_cacth_up_response__isset; + +class notify_cacth_up_response +{ +public: + notify_cacth_up_response(const notify_cacth_up_response &); + notify_cacth_up_response(notify_cacth_up_response &&); + notify_cacth_up_response &operator=(const notify_cacth_up_response &); + notify_cacth_up_response &operator=(notify_cacth_up_response &&); + notify_cacth_up_response() {} + + virtual ~notify_cacth_up_response() throw(); + ::dsn::error_code err; + + _notify_cacth_up_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + bool operator==(const notify_cacth_up_response &rhs) const + { + if (!(err == rhs.err)) + return false; + return true; + } + bool operator!=(const notify_cacth_up_response &rhs) const { return !(*this == rhs); } + + bool operator<(const notify_cacth_up_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(notify_cacth_up_response &a, notify_cacth_up_response &b); + +inline std::ostream &operator<<(std::ostream &out, const notify_cacth_up_response &obj) +{ + obj.printTo(out); + return out; +} } } // namespace diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index f8b4047470..9b2886c3a3 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -13309,5 +13309,262 @@ void app_partition_split_response::printTo(std::ostream &out) const << "partition_count=" << to_string(partition_count); out << ")"; } + +notify_catch_up_request::~notify_catch_up_request() throw() {} + +void notify_catch_up_request::__set_parent_gpid(const ::dsn::gpid &val) { this->parent_gpid = val; } + +void notify_catch_up_request::__set_child_gpid(const ::dsn::gpid &val) { this->child_gpid = val; } + +void notify_catch_up_request::__set_child_ballot(const int64_t val) { this->child_ballot = val; } + +void notify_catch_up_request::__set_child_address(const ::dsn::rpc_address &val) +{ + this->child_address = val; +} + +uint32_t notify_catch_up_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->parent_gpid.read(iprot); + this->__isset.parent_gpid = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->child_gpid.read(iprot); + this->__isset.child_gpid = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->child_ballot); + this->__isset.child_ballot = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->child_address.read(iprot); + this->__isset.child_address = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t notify_catch_up_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("notify_catch_up_request"); + + xfer += oprot->writeFieldBegin("parent_gpid", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->parent_gpid.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("child_gpid", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->child_gpid.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("child_ballot", ::apache::thrift::protocol::T_I64, 3); + xfer += oprot->writeI64(this->child_ballot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("child_address", ::apache::thrift::protocol::T_STRUCT, 4); + xfer += this->child_address.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(notify_catch_up_request &a, notify_catch_up_request &b) +{ + using ::std::swap; + swap(a.parent_gpid, b.parent_gpid); + swap(a.child_gpid, b.child_gpid); + swap(a.child_ballot, b.child_ballot); + swap(a.child_address, b.child_address); + swap(a.__isset, b.__isset); +} + +notify_catch_up_request::notify_catch_up_request(const notify_catch_up_request &other578) +{ + parent_gpid = other578.parent_gpid; + child_gpid = other578.child_gpid; + child_ballot = other578.child_ballot; + child_address = other578.child_address; + __isset = other578.__isset; +} +notify_catch_up_request::notify_catch_up_request(notify_catch_up_request &&other579) +{ + parent_gpid = std::move(other579.parent_gpid); + child_gpid = std::move(other579.child_gpid); + child_ballot = std::move(other579.child_ballot); + child_address = std::move(other579.child_address); + __isset = std::move(other579.__isset); +} +notify_catch_up_request ¬ify_catch_up_request::operator=(const notify_catch_up_request &other580) +{ + parent_gpid = other580.parent_gpid; + child_gpid = other580.child_gpid; + child_ballot = other580.child_ballot; + child_address = other580.child_address; + __isset = other580.__isset; + return *this; +} +notify_catch_up_request ¬ify_catch_up_request::operator=(notify_catch_up_request &&other581) +{ + parent_gpid = std::move(other581.parent_gpid); + child_gpid = std::move(other581.child_gpid); + child_ballot = std::move(other581.child_ballot); + child_address = std::move(other581.child_address); + __isset = std::move(other581.__isset); + return *this; +} +void notify_catch_up_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "notify_catch_up_request("; + out << "parent_gpid=" << to_string(parent_gpid); + out << ", " + << "child_gpid=" << to_string(child_gpid); + out << ", " + << "child_ballot=" << to_string(child_ballot); + out << ", " + << "child_address=" << to_string(child_address); + out << ")"; +} + +notify_cacth_up_response::~notify_cacth_up_response() throw() {} + +void notify_cacth_up_response::__set_err(const ::dsn::error_code &val) { this->err = val; } + +uint32_t notify_cacth_up_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t notify_cacth_up_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("notify_cacth_up_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(notify_cacth_up_response &a, notify_cacth_up_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.__isset, b.__isset); +} + +notify_cacth_up_response::notify_cacth_up_response(const notify_cacth_up_response &other582) +{ + err = other582.err; + __isset = other582.__isset; +} +notify_cacth_up_response::notify_cacth_up_response(notify_cacth_up_response &&other583) +{ + err = std::move(other583.err); + __isset = std::move(other583.__isset); +} +notify_cacth_up_response ¬ify_cacth_up_response:: +operator=(const notify_cacth_up_response &other584) +{ + err = other584.err; + __isset = other584.__isset; + return *this; +} +notify_cacth_up_response ¬ify_cacth_up_response::operator=(notify_cacth_up_response &&other585) +{ + err = std::move(other585.err); + __isset = std::move(other585.__isset); + return *this; +} +void notify_cacth_up_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "notify_cacth_up_response("; + out << "err=" << to_string(err); + out << ")"; +} } } // namespace diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 1c38df9b86..90822bea8a 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -389,6 +389,14 @@ class replica : public serverlet, public ref_counter, public replica_ba // child send notification to primary parent when it finish async learn void child_notify_catch_up(); + // primary parent handle child catch_up request + void parent_handle_child_catch_up(const notify_catch_up_request &request, + notify_cacth_up_response &response); + + // primary parent check if sync_point has been committed + // sync_point is the first decree after parent send write request to child synchronously + void parent_check_sync_point_commit(decree sync_point); + // return true if parent status is valid bool parent_check_states(); diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index e014115e8e..79f27d5a9c 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -92,6 +92,10 @@ void primary_context::cleanup(bool clean_pending_mutations) CLEANUP_TASK_ALWAYS(checkpoint_task) membership.ballot = 0; + + caught_up_children.clear(); + + sync_send_write_request = false; } bool primary_context::is_cleaned() diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index ae5eee773e..2367d88b60 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -102,6 +102,22 @@ class primary_context dsn::task_ptr checkpoint_task; uint64_t last_prepare_ts_ms; + + // Used for partition split + // child addresses who has been caught up with its parent + std::unordered_set caught_up_children; + + // Used for partition split + // whether parent's write request should be sent to child synchronously + // if {sync_send_write_request} = true + // - parent should recevie prepare ack from child synchronously during 2pc + // if {sync_send_write_request} = false and replica is during partition split + // - parent should copy mutations to child asynchronously, child is during async-learn + // whether a replica is during partition split is determined by a variety named `_child_gpid` of + // replica class + // if app_id of `_child_gpid` is greater than zero, it means replica is during partition split, + // otherwise, not during partition split + bool sync_send_write_request{false}; }; class secondary_context diff --git a/src/dist/replication/lib/replica_split.cpp b/src/dist/replication/lib/replica_split.cpp index dc12ba4df6..3f0c45d114 100644 --- a/src/dist/replication/lib/replica_split.cpp +++ b/src/dist/replication/lib/replica_split.cpp @@ -14,6 +14,8 @@ namespace dsn { namespace replication { +typedef rpc_holder notify_catch_up_rpc; + // ThreadPool: THREAD_POOL_REPLICATION void replica::on_add_child(const group_check_request &request) // on parent partition { @@ -474,7 +476,134 @@ void replica::child_catch_up_states() // on child partition void replica::child_notify_catch_up() // on child partition { FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {}); - // TODO(heyuchen): TBD + + std::unique_ptr request = make_unique(); + request->parent_gpid = _split_states.parent_gpid; + request->child_gpid = get_gpid(); + request->child_ballot = get_ballot(); + request->child_address = _stub->_primary_address; + + ddebug_replica("send notification to primary: {}@{}, ballot={}", + _split_states.parent_gpid, + _config.primary.to_string(), + get_ballot()); + + notify_catch_up_rpc rpc(std::move(request), RPC_SPLIT_NOTIFY_CATCH_UP); + rpc.call(_config.primary, + tracker(), + [this, rpc](error_code ec) mutable { + auto response = rpc.response(); + if (ec == ERR_TIMEOUT) { + dwarn_replica("notify primary catch up timeout, please wait and retry"); + tasking::enqueue(LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica::child_notify_catch_up, this), + get_gpid().thread_hash(), + std::chrono::seconds(1)); + return; + } + if (ec != ERR_OK || response.err != ERR_OK) { + error_code err = (ec == ERR_OK) ? response.err : ec; + dwarn_replica("failed to notify primary catch up, error={}", err.to_string()); + _stub->split_replica_error_handler( + _split_states.parent_gpid, + std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1)); + child_handle_split_error("notify_primary_split_catch_up"); + return; + } + ddebug_replica("notify primary catch up succeed"); + }, + _split_states.parent_gpid.thread_hash()); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::parent_handle_child_catch_up(const notify_catch_up_request &request, + notify_cacth_up_response &response) // on primary parent +{ + if (status() != partition_status::PS_PRIMARY) { + derror_replica("status is {}", enum_to_string(status())); + response.err = ERR_INVALID_STATE; + return; + } + + if (request.child_ballot != get_ballot()) { + derror_replica("receive out-date request, request ballot = {}, local ballot = {}", + request.child_ballot, + get_ballot()); + response.err = ERR_INVALID_STATE; + return; + } + + if (request.child_gpid != _child_gpid) { + derror_replica( + "receive wrong child request, request child_gpid = {}, local child_gpid = {}", + request.child_gpid, + _child_gpid); + response.err = ERR_INVALID_STATE; + return; + } + + response.err = ERR_OK; + ddebug_replica("receive catch_up request from {}@{}, current ballot={}", + request.child_gpid, + request.child_address.to_string(), + request.child_ballot); + + _primary_states.caught_up_children.insert(request.child_address); + // _primary_states.statuses is a map structure: rpc address -> partition_status + // it stores replica's rpc address and partition_status of this replica group + for (auto &iter : _primary_states.statuses) { + if (_primary_states.caught_up_children.find(iter.first) == + _primary_states.caught_up_children.end()) { + // there are child partitions not caught up its parent + return; + } + } + + ddebug_replica("all child partitions catch up"); + _primary_states.caught_up_children.clear(); + _primary_states.sync_send_write_request = true; + + // sync_point is the first decree after parent send write request to child synchronously + // when sync_point commit, parent consider child has all data it should have during async-learn + decree sync_point = _prepare_list->max_decree() + 1; + if (!_options->empty_write_disabled) { + // empty wirte here to commit sync_point + mutation_ptr mu = new_mutation(invalid_decree); + mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); + init_prepare(mu, false); + dassert_replica(sync_point == mu->data.header.decree, + "sync_point should be equal to mutation's decree, {} vs {}", + sync_point, + mu->data.header.decree); + }; + + // check if sync_point has been committed + tasking::enqueue(LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica::parent_check_sync_point_commit, this, sync_point), + get_gpid().thread_hash(), + std::chrono::seconds(1)); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::parent_check_sync_point_commit(decree sync_point) // on primary parent +{ + FAIL_POINT_INJECT_F("replica_parent_check_sync_point_commit", [](dsn::string_view) {}); + ddebug_replica("sync_point = {}, app last_committed_decree = {}", + sync_point, + _app->last_committed_decree()); + if (_app->last_committed_decree() >= sync_point) { + // TODO(heyuchen): TBD + // update child replica group partition_count + } else { + dwarn_replica("sync_point has not been committed, please wait and retry"); + tasking::enqueue(LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica::parent_check_sync_point_commit, this, sync_point), + get_gpid().thread_hash(), + std::chrono::seconds(1)); + } } // ThreadPool: THREAD_POOL_REPLICATION diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index aa2c16b0bf..ab1e3a9c4d 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -1972,6 +1972,9 @@ void replica_stub::open_service() register_rpc_handler(RPC_QUERY_APP_INFO, "query_app_info", &replica_stub::on_query_app_info); register_rpc_handler(RPC_COLD_BACKUP, "ColdBackup", &replica_stub::on_cold_backup); + register_rpc_handler(RPC_SPLIT_NOTIFY_CATCH_UP, + "child_notify_catch_up", + &replica_stub::on_notify_primary_split_catch_up); _kill_partition_command = ::dsn::command_manager::instance().register_app_command( {"kill_partition"}, @@ -2512,5 +2515,17 @@ void replica_stub::split_replica_error_handler(gpid pid, local_execution handler } } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_stub::on_notify_primary_split_catch_up(const notify_catch_up_request &request, + notify_cacth_up_response &response) +{ + replica_ptr replica = get_replica(request.parent_gpid); + if (replica != nullptr) { + replica->parent_handle_child_catch_up(request, response); + } else { + response.err = ERR_OBJECT_NOT_FOUND; + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index e2a4cc7063..4669a34ce3 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -114,6 +114,13 @@ class replica_stub : public serverlet, public ref_counter void on_group_check(const group_check_request &request, /*out*/ group_check_response &response); void on_copy_checkpoint(const replica_configuration &request, /*out*/ learn_response &response); + // + // functions while executing partition split + // + // on primary, child notify itself has been caught up parent + void on_notify_primary_split_catch_up(const notify_catch_up_request &request, + notify_cacth_up_response &response); + // // local messages // diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index f258281d23..b65a5e3dbd 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -785,6 +785,23 @@ struct app_partition_split_response 3:i32 partition_count; } +// child to primary parent, notifying that itself has caught up with parent +struct notify_catch_up_request +{ + 1:dsn.gpid parent_gpid; + 2:dsn.gpid child_gpid; + 3:i64 child_ballot; + 4:dsn.rpc_address child_address; +} + +struct notify_cacth_up_response +{ + // Possible errors: + // - ERR_OBJECT_NOT_FOUND: replica can not be found + // - ERR_INVALID_STATE: replica is not primary or ballot not match or child_gpid not match + 1:dsn.error_code err; +} + /* service replica_s { diff --git a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp index db7931f3b2..9d615fa72b 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_split_test.cpp @@ -36,9 +36,18 @@ class replica_split_test : public testing::Test void mock_group_check_request() { - _req.child_gpid = _child_pid; - _req.config.ballot = _init_ballot; - _req.config.status = partition_status::PS_PRIMARY; + _group_check_req.child_gpid = _child_pid; + _group_check_req.config.ballot = _init_ballot; + _group_check_req.config.status = partition_status::PS_PRIMARY; + } + + void mock_notify_catch_up_request() + { + _parent->set_child_gpid(_child_pid); + _catch_up_req.child_gpid = _child_pid; + _catch_up_req.parent_gpid = _parent_pid; + _catch_up_req.child_ballot = _init_ballot; + _catch_up_req.child_address = dsn::rpc_address("127.0.0.1", 1); } void generate_child(partition_status::type status) @@ -106,6 +115,23 @@ class replica_split_test : public testing::Test } } + void mock_parent_primary_context(bool will_all_caught_up) + { + _parent->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 1)] = + partition_status::PS_PRIMARY; + _parent->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 2)] = + partition_status::PS_SECONDARY; + _parent->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 3)] = + partition_status::PS_SECONDARY; + _parent->_primary_states.caught_up_children.insert(dsn::rpc_address("127.0.0.1", 2)); + if (will_all_caught_up) { + _parent->_primary_states.caught_up_children.insert(dsn::rpc_address("127.0.0.1", 3)); + } + _parent->_primary_states.sync_send_write_request = false; + } + + bool get_sync_send_write_request() { return _parent->_primary_states.sync_send_write_request; } + void mock_child_async_learn_states(mock_replica_ptr plist_rep, bool add_to_plog, decree min_decree) { @@ -133,7 +159,7 @@ class replica_split_test : public testing::Test void test_on_add_child() { - _parent->on_add_child(_req); + _parent->on_add_child(_group_check_req); _parent->tracker()->wait_outstanding_tasks(); } @@ -187,6 +213,14 @@ class replica_split_test : public testing::Test _child->tracker()->wait_outstanding_tasks(); } + dsn::error_code test_parent_handle_child_catch_up() + { + notify_cacth_up_response resp; + _parent->parent_handle_child_catch_up(_catch_up_req, resp); + _parent->tracker()->wait_outstanding_tasks(); + return resp.err; + } + public: std::unique_ptr _stub; @@ -200,7 +234,8 @@ class replica_split_test : public testing::Test ballot _init_ballot = 3; decree _decree = 5; - group_check_request _req; + group_check_request _group_check_req; + notify_catch_up_request _catch_up_req; std::vector _private_log_files; std::vector _mutation_list; const uint32_t _max_count = 10; @@ -212,7 +247,7 @@ class replica_split_test : public testing::Test TEST_F(replica_split_test, add_child_wrong_ballot) { ballot wrong_ballot = 5; - _req.config.ballot = wrong_ballot; + _group_check_req.config.ballot = wrong_ballot; test_on_add_child(); ASSERT_EQ(_stub->get_replica(_child_pid), nullptr); } @@ -380,5 +415,46 @@ TEST_F(replica_split_test, catch_up_succeed_with_learn_in_memory_mutations) cleanup_child_split_context(); } +TEST_F(replica_split_test, handle_catch_up_with_ballot_wrong) +{ + mock_notify_catch_up_request(); + _catch_up_req.child_ballot = 1; + + fail::setup(); + fail::cfg("replica_parent_check_sync_point_commit", "return()"); + dsn::error_code err = test_parent_handle_child_catch_up(); + fail::teardown(); + + ASSERT_EQ(err, ERR_INVALID_STATE); +} + +TEST_F(replica_split_test, handle_catch_up_with_not_all_caught_up) +{ + mock_parent_primary_context(false); + mock_notify_catch_up_request(); + + fail::setup(); + fail::cfg("replica_parent_check_sync_point_commit", "return()"); + dsn::error_code err = test_parent_handle_child_catch_up(); + fail::teardown(); + + ASSERT_EQ(err, ERR_OK); + ASSERT_FALSE(get_sync_send_write_request()); +} + +TEST_F(replica_split_test, handle_catch_up_with_all_caught_up) +{ + mock_parent_primary_context(true); + mock_notify_catch_up_request(); + + fail::setup(); + fail::cfg("replica_parent_check_sync_point_commit", "return()"); + dsn::error_code err = test_parent_handle_child_catch_up(); + fail::teardown(); + + ASSERT_EQ(err, ERR_OK); + ASSERT_TRUE(get_sync_send_write_request()); +} + } // namespace replication } // namespace dsn