Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): child replica learn parent prepare list and checkpoint #309

Merged
merged 7 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MAKE_EVENT_CODE(LPC_CHECKPOINT_REPLICA, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CATCHUP_WITH_PRIVATE_LOGS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_DISK_STAT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BACKGROUND_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ASYNC_LEARN, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH)
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/prepare_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class prepare_list : public mutation_cache, private replica_base
decree last_committed_decree() const { return _last_committed_decree; }
void reset(decree init_decree);
void truncate(decree init_decree);
void set_committer(mutation_committer committer) { _committer = committer; }

//
// for two-phase commit
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ void replica::close()
dassert(_secondary_states.is_cleaned(), "secondary context is not cleared");
dassert(_potential_secondary_states.is_cleaned(),
"potential secondary context is not cleared");
dassert(_split_states.is_cleaned(), "partition split context is not cleared");
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
}

// for partition_status::PS_ERROR, context cleanup is done here as they may block
Expand Down
30 changes: 25 additions & 5 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,29 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

void parent_prepare_states(const std::string &dir);

void child_copy_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist);
// child copy parent prepare list and call child_learn_states
void child_copy_prepare_list(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist);

// child learn states(including checkpoint, private logs, in-memory mutations)
void child_learn_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_replay_private_log(std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// return true if parent status is valid
bool parent_check_states();
Expand All @@ -351,6 +369,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void parent_cleanup_split_context();
// child suicide when partition split failed
void child_handle_split_error(const std::string &error_msg);
// child handle error while async learn parent states
void child_handle_async_learn_error();

private:
friend class ::dsn::replication::replication_checker;
Expand Down
5 changes: 5 additions & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1290,9 +1290,14 @@ void cold_backup_context::file_upload_complete(const std::string &filename)

bool partition_split_context::cleanup(bool force)
{
CLEANUP_TASK(async_learn_task, force)

parent_gpid.set_app_id(0);
is_prepare_list_copied = false;
return true;
}

bool partition_split_context::is_cleaned() const { return async_learn_task == nullptr; }

} // namespace replication
} // namespace dsn
8 changes: 6 additions & 2 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,12 +526,16 @@ typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
class partition_split_context
{
public:
partition_split_context() {}
// TODO(heyuchen): force will be used in further pull request
partition_split_context() : is_prepare_list_copied(false) {}
bool cleanup(bool force);
bool is_cleaned() const;

public:
gpid parent_gpid;
bool is_prepare_list_copied;

// child replica async learn parent states
dsn::task_ptr async_learn_task;
};

//---------------inline impl----------------------------------------------------------------
Expand Down
172 changes: 164 additions & 8 deletions src/dist/replication/lib/replica_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/fail_point.h>

Expand Down Expand Up @@ -93,6 +94,7 @@ void replica::child_init_replica(gpid parent_gpid,

// init split states
_split_states.parent_gpid = parent_gpid;
_split_states.is_prepare_list_copied = false;

ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid);

Expand Down Expand Up @@ -188,7 +190,7 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti
last_committed_decree());

_stub->split_replica_exec(_child_gpid,
std::bind(&replica::child_copy_states,
std::bind(&replica::child_copy_prepare_list,
std::placeholders::_1,
parent_states,
mutation_list,
Expand All @@ -200,14 +202,158 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_copy_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist) // on child partition
void replica::child_copy_prepare_list(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_copy_states", [](dsn::string_view) {});
// TODO(heyuchen): impelment function in further pull request
FAIL_POINT_INJECT_F("replica_child_copy_prepare_list", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status, status is {}", enum_to_string(status()));
_stub->split_replica_error_handler(
_split_states.parent_gpid,
std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1));
child_handle_split_error("wrong child status when execute child_copy_prepare_list");
return;
}

// learning parent states is time-consuming, should execute in THREAD_POOL_REPLICATION_LONG
decree last_committed_decree = plist->last_committed_decree();
_split_states.async_learn_task = tasking::enqueue(LPC_PARTITION_SPLIT_ASYNC_LEARN,
tracker(),
std::bind(&replica::child_learn_states,
this,
lstate,
mutation_list,
plog_files,
total_file_size,
last_committed_decree));

ddebug_replica("start to copy parent prepare list, last_committed_decree={}, prepare list min "
"decree={}, max decree={}",
last_committed_decree,
plist->min_decree(),
plist->max_decree());

// copy parent prepare list
plist->set_committer(std::bind(&replica::execute_mutation, this, std::placeholders::_1));
delete _prepare_list;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
_prepare_list = new prepare_list(this, *plist);
for (decree d = last_committed_decree + 1; d <= _prepare_list->max_decree(); ++d) {
mutation_ptr mu = _prepare_list->get_mutation_by_decree(d);
dassert_replica(mu != nullptr, "can not find mutation, dercee={}", d);
mu->data.header.pid = get_gpid();
_stub->_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr);
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
_private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr);
// set mutation has been logged in private log
if (!mu->is_logged()) {
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
mu->set_logged();
}
}
_split_states.is_prepare_list_copied = true;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica::child_learn_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_states", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status, status is {}", enum_to_string(status()));
child_handle_async_learn_error();
return;
}

ddebug_replica("start to learn states asynchronously, prepare_list last_committed_decree={}, "
"checkpoint decree range=({},{}], private log files count={}, in-memory "
"mutation count={}",
last_committed_decree,
lstate.from_decree_excluded,
lstate.to_decree_included,
plog_files.size(),
mutation_list.size());

// apply parent checkpoint
error_code err;
auto cleanup = defer([this, &err]() {
if (err != ERR_OK) {
child_handle_async_learn_error();
}
});

err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err != ERR_OK) {
derror_replica("failed to apply checkpoint, error={}", err);
return;
}

// replay parent private log
err = child_replay_private_log(plog_files, total_file_size, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to replay private log, error={}", err);
return;
}

// learn parent in-memory mutations
err = child_learn_mutations(mutation_list, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to learn mutations, error={}", err);
return;
}

// generate a checkpoint synchronously
err = _app->sync_checkpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 sync_checkpoint 是什么原因?必须要 checkpoint 吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里child已经异步learn完成,包括checkpoint, prepare_list, private_log和in-memory mutation,这时打一个checkpoint相当于标识异步learn完成,而且方便接下来的catch up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

”方便接下来的catch up“ 我没有理解主要是哪些方面方便了?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在异步learn完成之后,child还需要catch up在它进行异步learn过程中parent的states,这时打一个checkpoint,能把之前异步learn的states durable,更新一下last_durable_decree,主要是这个作用

if (err != ERR_OK) {
derror_replica("failed to generate checkpoint synchrounously, error={}", err);
return;
}

err = _app->update_init_info_ballot_and_decree(this);
if (err != ERR_OK) {
derror_replica("update_init_info_ballot_and_decree failed, error={}", err);
return;
}

ddebug_replica("learn parent states asynchronously succeed");

tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
_split_states.async_learn_task = nullptr;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_replay_private_log(std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_replay_private_log",
[](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; });
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
// TODO(heyuchen): TBD
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_catch_up_states() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {});
// TODO(heyuchen): TBD
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand All @@ -228,5 +374,15 @@ void replica::child_handle_split_error(const std::string &error_msg) // on child
}
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica::child_handle_async_learn_error() // on child partition
{
_stub->split_replica_error_handler(
_split_states.parent_gpid,
std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1));
child_handle_split_error("meet error when execute child_learn_states");
_split_states.async_learn_task = nullptr;
}

} // namespace replication
} // namespace dsn
5 changes: 3 additions & 2 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class mock_replication_app_base : public replication_app_base

error_code start(int, char **) override { return ERR_NOT_IMPLEMENTED; }
error_code stop(bool) override { return ERR_NOT_IMPLEMENTED; }
error_code sync_checkpoint() override { return ERR_NOT_IMPLEMENTED; }
error_code sync_checkpoint() override { return ERR_OK; }
error_code async_checkpoint(bool) override { return ERR_NOT_IMPLEMENTED; }
error_code prepare_get_checkpoint(blob &) override { return ERR_NOT_IMPLEMENTED; }
error_code get_checkpoint(int64_t, const blob &, learn_state &) override
Expand All @@ -53,7 +53,7 @@ class mock_replication_app_base : public replication_app_base
}
error_code storage_apply_checkpoint(chkpt_apply_mode, const learn_state &) override
{
return ERR_NOT_IMPLEMENTED;
return ERR_OK;
}
error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override
Expand Down Expand Up @@ -119,6 +119,7 @@ class mock_replica : public replica
void set_child_gpid(gpid pid) { _child_gpid = pid; }
void set_init_child_ballot(ballot b) { _child_init_ballot = b; }
void set_last_committed_decree(decree d) { _prepare_list->reset(d); }
prepare_list *get_plist() { return _prepare_list; }
};
typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;

Expand Down
Loading