From 919f518b7403c4a6b3a80ebdcabaefe582426c08 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 26 Nov 2020 14:42:00 +0800 Subject: [PATCH 1/2] feat(split): child copy mutation asynchronously --- src/replica/replica_2pc.cpp | 9 +++ src/replica/split/replica_split_manager.cpp | 82 +++++++++++++++++++++ src/replica/split/replica_split_manager.h | 18 +++++ 3 files changed, 109 insertions(+) diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 694e001e66..b4c3eb4b04 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -29,6 +29,7 @@ #include "mutation_log.h" #include "replica_stub.h" #include "bulk_load/replica_bulk_loader.h" +#include "split/replica_split_manager.h" #include "runtime/security/access_controller.h" #include #include @@ -238,6 +239,10 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c } mu->set_left_potential_secondary_ack_count(count); + if (_split_mgr->is_splitting()) { + _split_mgr->copy_mutation(mu); + } + if (mu->is_logged()) { do_possible_commit_on_primary(mu); } else { @@ -480,6 +485,10 @@ void replica::on_prepare(dsn::message_ex *request) return; } + if (_split_mgr->is_splitting()) { + _split_mgr->copy_mutation(mu); + } + dassert(mu->log_task() == nullptr, ""); mu->log_task() = _stub->_log->append(mu, LPC_WRITE_REPLICATION_LOG, diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index 40ff8acbcc..bfdf07e285 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -1160,5 +1160,87 @@ void replica_split_manager::trigger_secondary_parent_split( // TODO(heyuchen): add other split_status check, response will be used in future } +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::copy_mutation(mutation_ptr &mu) // on parent partition +{ + dassert_replica(_child_gpid.get_app_id() > 0, "child_gpid({}) is invalid", _child_gpid); + + // TODO(hyc): if copy mutation synchronously, add flags + + mutation_ptr new_mu = mutation::copy_no_reply(mu); + error_code ec = _stub->split_replica_exec( + LPC_PARTITION_SPLIT, + _child_gpid, + std::bind(&replica_split_manager::on_copy_mutation, std::placeholders::_1, new_mu)); + if (ec != ERR_OK) { + parent_cleanup_split_context(); + } +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child partition +{ + if (status() != partition_status::PS_PARTITION_SPLIT) { + derror_replica( + "wrong status({}), ignore this mutation({})", enum_to_string(status()), mu->name()); + _stub->split_replica_error_handler( + _replica->_split_states.parent_gpid, [mu](replica_split_manager *split_mgr) { + split_mgr->parent_cleanup_split_context(); + split_mgr->on_copy_mutation_reply( + ERR_OK, mu->data.header.ballot, mu->data.header.decree); + }); + return; + } + + if (!_replica->_split_states.is_prepare_list_copied) { + return; + } + + if (mu->data.header.ballot > get_ballot()) { + derror_replica("ballot changed, mutation ballot({}) vs local ballot({}), ignore copy this " + "mutation({})", + mu->data.header.ballot, + get_ballot(), + mu->name()); + _stub->split_replica_error_handler( + _replica->_split_states.parent_gpid, [mu](replica_split_manager *split_mgr) { + split_mgr->parent_cleanup_split_context(); + split_mgr->on_copy_mutation_reply( + ERR_OK, mu->data.header.ballot, mu->data.header.decree); + }); + child_handle_split_error("on_copy_mutation failed because ballot changed"); + return; + } + + mu->data.header.pid = get_gpid(); + _replica->_prepare_list->prepare(mu, partition_status::PS_SECONDARY); + if (!mu->is_sync_to_child()) { // child copy mutation asynchronously + if (!mu->is_logged()) { + mu->set_logged(); + } + mu->log_task() = _stub->_log->append( + mu, LPC_WRITE_REPLICATION_LOG, tracker(), nullptr, get_gpid().thread_hash()); + _replica->_private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr); + } else { + // TODO(heyuchen): child copy mutation synchronously + } +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::ack_parent(error_code ec, mutation_ptr &mu) // on child partition +{ + // TODO(heyuchen): when child copy mutation synchronously, child replica send ack to its parent + // TBD +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica_split_manager::on_copy_mutation_reply(error_code ec, + ballot b, + decree d) // on parent partition +{ + // TODO(heyuchen): when child copy mutation synchronously, parent replica handle child ack + // TBD +} + } // namespace replication } // namespace dsn diff --git a/src/replica/split/replica_split_manager.h b/src/replica/split/replica_split_manager.h index 0e7119f94d..2228bf201e 100644 --- a/src/replica/split/replica_split_manager.h +++ b/src/replica/split/replica_split_manager.h @@ -33,6 +33,11 @@ class replica_split_manager : replica_base int32_t get_partition_version() const { return _partition_version.load(); } gpid get_child_gpid() const { return _child_gpid; } void set_child_gpid(gpid pid) { _child_gpid = pid; } + bool is_splitting() const + { + return _child_gpid.get_app_id() > 0 && _child_init_ballot > 0 && + _split_status == split_status::SPLITTING; + } private: // parent partition start split @@ -135,6 +140,19 @@ class replica_split_manager : replica_base void trigger_secondary_parent_split(const group_check_request &request, /*out*/ group_check_response &response); + // parent copy mutations to child during partition split + void copy_mutation(mutation_ptr &mu); + + // child add mutation into prepare list and private log + // after child copy prepare list, before child replica become active + void on_copy_mutation(mutation_ptr &mu); + + // when child copy mutation synchronously, child replica send ack to its parent + void ack_parent(dsn::error_code ec, mutation_ptr &mu); + + // when child copy mutation synchronously, parent replica handle child ack + void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d); + // // helper functions // From 10da77b395b13a0d962086b300b20a3eccf7c64b Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 2 Dec 2020 09:54:17 +0800 Subject: [PATCH 2/2] update by cr --- src/replica/split/replica_split_manager.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index 39d382dc97..4678258286 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -1204,6 +1204,8 @@ void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child parti return; } + // It is possible for child has not copied parent prepare list, because parent and child may + // execute in different thread. In this case, child should ignore this mutation. if (!_replica->_split_states.is_prepare_list_copied) { return; } @@ -1232,7 +1234,8 @@ void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child parti } mu->log_task() = _stub->_log->append( mu, LPC_WRITE_REPLICATION_LOG, tracker(), nullptr, get_gpid().thread_hash()); - _replica->_private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr); + _replica->_private_log->append( + mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr, get_gpid().thread_hash()); } else { // TODO(heyuchen): child copy mutation synchronously }