Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
fix: implement reset_from to fix plog loss when duplicating with lear…
Browse files Browse the repository at this point in the history
…ning (#845)
  • Loading branch information
foreverneverer authored Jun 22, 2021
1 parent 60769c4 commit 76577b2
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 70 deletions.
60 changes: 60 additions & 0 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <dsn/utils/latency_tracer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/crc.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
Expand Down Expand Up @@ -916,6 +917,65 @@ int64_t mutation_log::total_size_no_lock() const
return _log_files.size() > 0 ? _global_end_offset - _global_start_offset : 0;
}

error_code mutation_log::reset_from(const std::string &dir,
replay_callback replay_error_callback,
io_failure_callback write_error_callback)
{
error_code err = ERR_FILE_OPERATION_FAILED;

// close for flushing current log and be ready to open new log files after reset
close();

// make sure logs in `dir` (such as /learn) are valid.
error_s es = log_utils::check_log_files_continuity(dir);
if (!es.is_ok()) {
derror_f("the log of source dir {} is invalid:{}, will remove it.", dir, es);
if (!utils::filesystem::remove_path(dir)) {
derror_f("remove {} failed", dir);
return err;
}
return es.code();
}

std::string temp_dir = _dir + '.' + std::to_string(dsn_now_ns());
if (!utils::filesystem::rename_path(_dir, temp_dir)) {
derror_f("rename {} to {} failed", _dir, temp_dir);
return err;
}
ddebug_f("moved current log dir {} to tmp_dir {}", _dir, temp_dir);
// define `defer` for rollback temp_dir when failed or remove temp_dir when success
auto temp_dir_resolve = dsn::defer([this, err, temp_dir]() {
if (err != ERR_OK) {
if (!utils::filesystem::rename_path(temp_dir, _dir)) {
// rollback failed means old log files are not be recovered, it may be lost if only
// derror, dassert for manual resolve it
dassert_f("rollback {} to {} failed", temp_dir, _dir);
}
} else {
if (!dsn::utils::filesystem::remove_path(temp_dir)) {
// temp dir allow delete failed, it's only garbage
derror_f("remove temp dir {} failed", temp_dir);
}
}
});

// move source dir to target dir
if (!utils::filesystem::rename_path(dir, _dir)) {
derror_f("rename {} to {} failed", dir, _dir);
return err;
}
ddebug_f("move {} to {} as our new log directory", dir, _dir);

// - make sure logs in moved dir(such as /plog) are valid and can be opened successfully.
// - re-open new log files for loading the new log file and register the files into replica,
// please make sure the old log files has been closed
err = open(replay_error_callback, write_error_callback);
if (err != ERR_OK) {
derror_f("the logs of moved dir {} are invalid and open failed:{}", _dir, err);
}
return err;
}

void mutation_log::set_valid_start_offset_on_open(gpid gpid, int64_t valid_start_offset)
{
zauto_lock l(_lock);
Expand Down
11 changes: 4 additions & 7 deletions src/replica/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,10 @@ class mutation_log : public ref_counter
return replay_block(log, callback, start_offset, end_offset);
}

// Resets private-log with log files under `dir`.
// The original plog will be removed after this call.
// NOTE: private-log should be opened before this method called.
virtual error_code reset_from(const std::string &dir, io_failure_callback)
{
return ERR_NOT_IMPLEMENTED;
}
// Resets mutation log with log files under `dir`.
// The original log will be removed after this call.
// NOTE: log should be opened before this method called. now it only be used private log
error_code reset_from(const std::string &dir, replay_callback, io_failure_callback);

//
// maintain max_decree & valid_start_offset
Expand Down
138 changes: 76 additions & 62 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,24 +568,25 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
return;
}

ddebug("%s: on_learn_reply[%016" PRIx64 "]: learnee = %s, learn_duration = %" PRIu64
" ms, response_err = %s, remote_committed_decree = %" PRId64 ", "
"prepare_start_decree = %" PRId64 ", learn_type = %s, learned_buffer_size = %u, "
"learned_file_count = %u, to_decree_included = %" PRId64
", learn_start_decree = %" PRId64 ", current_learning_status = %s",
name(),
req.signature,
resp.config.primary.to_string(),
_potential_secondary_states.duration_ms(),
resp.err.to_string(),
resp.last_committed_decree,
resp.prepare_start_decree,
enum_to_string(resp.type),
resp.state.meta.length(),
static_cast<uint32_t>(resp.state.files.size()),
resp.state.to_decree_included,
resp.state.learn_start_decree,
enum_to_string(_potential_secondary_states.learning_status));
ddebug_replica(
"on_learn_reply_start[{}]: learnee = {}, learn_duration ={} ms, response_err = "
"{}, remote_committed_decree = {}, prepare_start_decree = {}, learn_type = {} "
"learned_buffer_size = {}, learned_file_count = {},to_decree_included = "
"{}, learn_start_decree = {}, last_commit_decree = {}, current_learning_status = "
"{} ",
req.signature,
resp.config.primary.to_string(),
_potential_secondary_states.duration_ms(),
resp.err.to_string(),
resp.last_committed_decree,
resp.prepare_start_decree,
enum_to_string(resp.type),
resp.state.meta.length(),
static_cast<uint32_t>(resp.state.files.size()),
resp.state.to_decree_included,
resp.state.learn_start_decree,
_app->last_committed_decree(),
enum_to_string(_potential_secondary_states.learning_status));

_potential_secondary_states.learning_copy_buffer_size += resp.state.meta.length();
_stub->_counter_replicas_learning_recent_copy_buffer_size->add(resp.state.meta.length());
Expand Down Expand Up @@ -1507,7 +1508,14 @@ void replica::on_add_learner(const group_check_request &request)
error_code replica::apply_learned_state_from_private_log(learn_state &state)
{
bool duplicating = is_duplicating();

// if no dunplicate, learn_start_decree=last_commit decree, step_back means whether
// `learn_start_decree`should be stepped back to include all the
// unconfirmed when duplicating in this round of learn. default is false
bool step_back = false;

// in this case, this means `learn_start_decree` must have been stepped back to include all the
// unconfirmed(learn_start_decree=last_confirmed_decree) when duplicating in this round of
// learn.
// confirmed gced committed
// | | |
// learner's plog: ============[-----log------]
Expand All @@ -1518,28 +1526,33 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
// ==> learn_start_decree |
// learner's plog | committed
// after applied: [---------------log----------------]

if (duplicating && state.__isset.learn_start_decree &&
state.learn_start_decree < _app->last_committed_decree() + 1) {
// it means this round of learn must have been stepped back
// to include all the unconfirmed.

// move the `learn/` dir to working dir (`plog/`).
error_code err = _private_log->reset_from(_app->learn_dir(), [this](error_code err) {
tasking::enqueue(LPC_REPLICATION_ERROR,
&_tracker,
[this, err]() { handle_local_failure(err); },
get_gpid().thread_hash());
});
ddebug_replica("learn_start_decree({}) < _app->last_committed_decree() + 1({}), learn "
"must stepped back to include all the unconfirmed ",
state.learn_start_decree,
_app->last_committed_decree() + 1);

// move the `learn/` dir to working dir (`plog/`) to replace current log files to replay
error_code err = _private_log->reset_from(
_app->learn_dir(),
[](int log_length, mutation_ptr &mu) { return true; },
[this](error_code err) {
tasking::enqueue(LPC_REPLICATION_ERROR,
&_tracker,
[this, err]() { handle_local_failure(err); },
get_gpid().thread_hash());
});
if (err != ERR_OK) {
derror_replica("failed to reset this private log with logs in learn/ dir: {}", err);
return err;
}

// only the uncommitted logs will be replayed and applied into storage.
// only select uncommitted logs to be replayed and applied into storage.
learn_state tmp_state;
_private_log->get_learn_state(get_gpid(), _app->last_committed_decree() + 1, tmp_state);
state.files = tmp_state.files;
step_back = true;
}

int64_t offset;
Expand All @@ -1549,13 +1562,15 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
prepare_list plist(this,
_app->last_committed_decree(),
_options->max_mutation_count_in_prepare_list,
[this, duplicating](mutation_ptr &mu) {
[this, duplicating, step_back](mutation_ptr &mu) {
if (mu->data.header.decree == _app->last_committed_decree() + 1) {
// TODO: assign the returned error_code to err and check it
_app->apply_mutation(mu);

// appends logs-in-cache into plog to ensure them can be duplicated.
if (duplicating) {
// if current case is step back, it means the logs has been reserved
// through `reset_form` above
if (duplicating && !step_back) {
_private_log->append(
mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
}
Expand Down Expand Up @@ -1598,18 +1613,19 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
_potential_secondary_states.first_learn_start_decree = state.learn_start_decree;
}

ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, "
"learn_duration = %" PRIu64 " ms, apply private log files done, "
"file_count = %d, first_learn_start_decree = %" PRId64 ", learn_start_decree = %" PRId64
", app_committed_decree = %" PRId64,
name(),
_potential_secondary_states.learning_version,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
static_cast<int>(state.files.size()),
_potential_secondary_states.first_learn_start_decree,
state.learn_start_decree,
_app->last_committed_decree());
ddebug_replica("apply_learned_state_from_private_log[{}]: duplicating={}, step_back={}, "
"learnee = {}, learn_duration = {} ms, apply private log files done, file_count "
"={}, first_learn_start_decree ={}, learn_start_decree = {}, "
"app_committed_decree = {}",
_potential_secondary_states.learning_version,
duplicating,
step_back,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
state.files.size(),
_potential_secondary_states.first_learn_start_decree,
state.learn_start_decree,
_app->last_committed_decree());

// apply in-buffer private logs
if (err == ERR_OK) {
Expand All @@ -1631,26 +1647,24 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
}

if (state.to_decree_included > last_committed_decree()) {
ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, "
"learned_to_decree_included(%" PRId64 ") > last_committed_decree(%" PRId64 "), "
"commit to to_decree_included",
name(),
_potential_secondary_states.learning_version,
_config.primary.to_string(),
state.to_decree_included,
last_committed_decree());
ddebug_replica("apply_learned_state_from_private_log[{}]: learnee ={}, "
"learned_to_decree_included({}) > last_committed_decree({}), commit to "
"to_decree_included",
_potential_secondary_states.learning_version,
_config.primary.to_string(),
state.to_decree_included,
last_committed_decree());
plist.commit(state.to_decree_included, COMMIT_TO_DECREE_SOFT);
}

ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, "
"learn_duration = %" PRIu64 " ms, apply in-buffer private logs done, "
"replay_count = %d, app_committed_decree = %" PRId64,
name(),
_potential_secondary_states.learning_version,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
replay_count,
_app->last_committed_decree());
ddebug_replica(" apply_learned_state_from_private_log[{}]: learnee ={}, "
"learn_duration ={} ms, apply in-buffer private logs done, "
"replay_count ={}, app_committed_decree = {}",
_potential_secondary_states.learning_version,
_config.primary.to_string(),
_potential_secondary_states.duration_ms(),
replay_count,
_app->last_committed_decree());
}

// awaits for unfinished mutation writes.
Expand Down
Loading

0 comments on commit 76577b2

Please sign in to comment.