Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): child copy mutation asynchronously #676

Merged
merged 3 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "bulk_load/replica_bulk_loader.h"
#include "split/replica_split_manager.h"
#include "runtime/security/access_controller.h"
#include <dsn/utils/latency_tracer.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -238,6 +239,10 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
}
mu->set_left_potential_secondary_ack_count(count);

if (_split_mgr->is_splitting()) {
_split_mgr->copy_mutation(mu);
}

if (mu->is_logged()) {
do_possible_commit_on_primary(mu);
} else {
Expand Down Expand Up @@ -480,6 +485,10 @@ void replica::on_prepare(dsn::message_ex *request)
return;
}

if (_split_mgr->is_splitting()) {
_split_mgr->copy_mutation(mu);
}

dassert(mu->log_task() == nullptr, "");
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
Expand Down
85 changes: 85 additions & 0 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,5 +1172,90 @@ void replica_split_manager::trigger_secondary_parent_split(
// TODO(heyuchen): add other split_status check, response will be used in future
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::copy_mutation(mutation_ptr &mu) // on parent partition
{
dassert_replica(_child_gpid.get_app_id() > 0, "child_gpid({}) is invalid", _child_gpid);

// TODO(hyc): if copy mutation synchronously, add flags

mutation_ptr new_mu = mutation::copy_no_reply(mu);
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
error_code ec = _stub->split_replica_exec(
LPC_PARTITION_SPLIT,
_child_gpid,
std::bind(&replica_split_manager::on_copy_mutation, std::placeholders::_1, new_mu));
if (ec != ERR_OK) {
parent_cleanup_split_context();
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child partition
{
if (status() != partition_status::PS_PARTITION_SPLIT) {
derror_replica(
"wrong status({}), ignore this mutation({})", enum_to_string(status()), mu->name());
_stub->split_replica_error_handler(
_replica->_split_states.parent_gpid, [mu](replica_split_manager *split_mgr) {
split_mgr->parent_cleanup_split_context();
split_mgr->on_copy_mutation_reply(
ERR_OK, mu->data.header.ballot, mu->data.header.decree);
});
return;
}

// It is possible for child has not copied parent prepare list, because parent and child may
// execute in different thread. In this case, child should ignore this mutation.
if (!_replica->_split_states.is_prepare_list_copied) {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
return;
}

if (mu->data.header.ballot > get_ballot()) {
derror_replica("ballot changed, mutation ballot({}) vs local ballot({}), ignore copy this "
"mutation({})",
mu->data.header.ballot,
get_ballot(),
mu->name());
_stub->split_replica_error_handler(
_replica->_split_states.parent_gpid, [mu](replica_split_manager *split_mgr) {
split_mgr->parent_cleanup_split_context();
split_mgr->on_copy_mutation_reply(
ERR_OK, mu->data.header.ballot, mu->data.header.decree);
});
child_handle_split_error("on_copy_mutation failed because ballot changed");
return;
}

mu->data.header.pid = get_gpid();
_replica->_prepare_list->prepare(mu, partition_status::PS_SECONDARY);
if (!mu->is_sync_to_child()) { // child copy mutation asynchronously
if (!mu->is_logged()) {
mu->set_logged();
}
mu->log_task() = _stub->_log->append(
mu, LPC_WRITE_REPLICATION_LOG, tracker(), nullptr, get_gpid().thread_hash());
_replica->_private_log->append(
mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr, get_gpid().thread_hash());
} else {
// TODO(heyuchen): child copy mutation synchronously
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::ack_parent(error_code ec, mutation_ptr &mu) // on child partition
{
// TODO(heyuchen): when child copy mutation synchronously, child replica send ack to its parent
// TBD
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_copy_mutation_reply(error_code ec,
ballot b,
decree d) // on parent partition
{
// TODO(heyuchen): when child copy mutation synchronously, parent replica handle child ack
// TBD
}

} // namespace replication
} // namespace dsn
13 changes: 13 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ class replica_split_manager : replica_base
void trigger_secondary_parent_split(const group_check_request &request,
/*out*/ group_check_response &response);

// parent copy mutations to child during partition split
void copy_mutation(mutation_ptr &mu);

// child add mutation into prepare list and private log
// after child copy prepare list, before child replica become active
void on_copy_mutation(mutation_ptr &mu);

// when child copy mutation synchronously, child replica send ack to its parent
void ack_parent(dsn::error_code ec, mutation_ptr &mu);

// when child copy mutation synchronously, parent replica handle child ack
void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d);

//
// helper functions
//
Expand Down