diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp index 2916aff3c5..44c34799ca 100644 --- a/src/replica/mutation_log.cpp +++ b/src/replica/mutation_log.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -916,6 +917,65 @@ int64_t mutation_log::total_size_no_lock() const return _log_files.size() > 0 ? _global_end_offset - _global_start_offset : 0; } +error_code mutation_log::reset_from(const std::string &dir, + replay_callback replay_error_callback, + io_failure_callback write_error_callback) +{ + error_code err = ERR_FILE_OPERATION_FAILED; + + // close for flushing current log and be ready to open new log files after reset + close(); + + // make sure logs in `dir` (such as /learn) are valid. + error_s es = log_utils::check_log_files_continuity(dir); + if (!es.is_ok()) { + derror_f("the log of source dir {} is invalid:{}, will remove it.", dir, es); + if (!utils::filesystem::remove_path(dir)) { + derror_f("remove {} failed", dir); + return err; + } + return es.code(); + } + + std::string temp_dir = _dir + '.' + std::to_string(dsn_now_ns()); + if (!utils::filesystem::rename_path(_dir, temp_dir)) { + derror_f("rename {} to {} failed", _dir, temp_dir); + return err; + } + ddebug_f("moved current log dir {} to tmp_dir {}", _dir, temp_dir); + // define `defer` for rollback temp_dir when failed or remove temp_dir when success + auto temp_dir_resolve = dsn::defer([this, err, temp_dir]() { + if (err != ERR_OK) { + if (!utils::filesystem::rename_path(temp_dir, _dir)) { + // rollback failed means old log files are not be recovered, it may be lost if only + // derror, dassert for manual resolve it + dassert_f("rollback {} to {} failed", temp_dir, _dir); + } + } else { + if (!dsn::utils::filesystem::remove_path(temp_dir)) { + // temp dir allow delete failed, it's only garbage + derror_f("remove temp dir {} failed", temp_dir); + } + } + }); + + // move source dir to target dir + if (!utils::filesystem::rename_path(dir, _dir)) { + derror_f("rename {} to {} failed", dir, _dir); + return err; + } + ddebug_f("move {} to {} as our new log directory", dir, _dir); + + // - make sure logs in moved dir(such as /plog) are valid and can be opened successfully. + // - re-open new log files for loading the new log file and register the files into replica, + // please make sure the old log files has been closed + err = open(replay_error_callback, write_error_callback); + if (err != ERR_OK) { + derror_f("the logs of moved dir {} are invalid and open failed:{}", _dir, err); + } + return err; +} + void mutation_log::set_valid_start_offset_on_open(gpid gpid, int64_t valid_start_offset) { zauto_lock l(_lock); diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h index e1efbb2b15..ec58ece4b7 100644 --- a/src/replica/mutation_log.h +++ b/src/replica/mutation_log.h @@ -149,13 +149,10 @@ class mutation_log : public ref_counter return replay_block(log, callback, start_offset, end_offset); } - // Resets private-log with log files under `dir`. - // The original plog will be removed after this call. - // NOTE: private-log should be opened before this method called. - virtual error_code reset_from(const std::string &dir, io_failure_callback) - { - return ERR_NOT_IMPLEMENTED; - } + // Resets mutation log with log files under `dir`. + // The original log will be removed after this call. + // NOTE: log should be opened before this method called. now it only be used private log + error_code reset_from(const std::string &dir, replay_callback, io_failure_callback); // // maintain max_decree & valid_start_offset diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index 41c4786106..b354ba765b 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -568,24 +568,25 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response return; } - ddebug("%s: on_learn_reply[%016" PRIx64 "]: learnee = %s, learn_duration = %" PRIu64 - " ms, response_err = %s, remote_committed_decree = %" PRId64 ", " - "prepare_start_decree = %" PRId64 ", learn_type = %s, learned_buffer_size = %u, " - "learned_file_count = %u, to_decree_included = %" PRId64 - ", learn_start_decree = %" PRId64 ", current_learning_status = %s", - name(), - req.signature, - resp.config.primary.to_string(), - _potential_secondary_states.duration_ms(), - resp.err.to_string(), - resp.last_committed_decree, - resp.prepare_start_decree, - enum_to_string(resp.type), - resp.state.meta.length(), - static_cast(resp.state.files.size()), - resp.state.to_decree_included, - resp.state.learn_start_decree, - enum_to_string(_potential_secondary_states.learning_status)); + ddebug_replica( + "on_learn_reply_start[{}]: learnee = {}, learn_duration ={} ms, response_err = " + "{}, remote_committed_decree = {}, prepare_start_decree = {}, learn_type = {} " + "learned_buffer_size = {}, learned_file_count = {},to_decree_included = " + "{}, learn_start_decree = {}, last_commit_decree = {}, current_learning_status = " + "{} ", + req.signature, + resp.config.primary.to_string(), + _potential_secondary_states.duration_ms(), + resp.err.to_string(), + resp.last_committed_decree, + resp.prepare_start_decree, + enum_to_string(resp.type), + resp.state.meta.length(), + static_cast(resp.state.files.size()), + resp.state.to_decree_included, + resp.state.learn_start_decree, + _app->last_committed_decree(), + enum_to_string(_potential_secondary_states.learning_status)); _potential_secondary_states.learning_copy_buffer_size += resp.state.meta.length(); _stub->_counter_replicas_learning_recent_copy_buffer_size->add(resp.state.meta.length()); @@ -1507,7 +1508,14 @@ void replica::on_add_learner(const group_check_request &request) error_code replica::apply_learned_state_from_private_log(learn_state &state) { bool duplicating = is_duplicating(); - + // if no dunplicate, learn_start_decree=last_commit decree, step_back means whether + // `learn_start_decree`should be stepped back to include all the + // unconfirmed when duplicating in this round of learn. default is false + bool step_back = false; + + // in this case, this means `learn_start_decree` must have been stepped back to include all the + // unconfirmed(learn_start_decree=last_confirmed_decree) when duplicating in this round of + // learn. // confirmed gced committed // | | | // learner's plog: ============[-----log------] @@ -1518,28 +1526,33 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) // ==> learn_start_decree | // learner's plog | committed // after applied: [---------------log----------------] - if (duplicating && state.__isset.learn_start_decree && state.learn_start_decree < _app->last_committed_decree() + 1) { - // it means this round of learn must have been stepped back - // to include all the unconfirmed. - - // move the `learn/` dir to working dir (`plog/`). - error_code err = _private_log->reset_from(_app->learn_dir(), [this](error_code err) { - tasking::enqueue(LPC_REPLICATION_ERROR, - &_tracker, - [this, err]() { handle_local_failure(err); }, - get_gpid().thread_hash()); - }); + ddebug_replica("learn_start_decree({}) < _app->last_committed_decree() + 1({}), learn " + "must stepped back to include all the unconfirmed ", + state.learn_start_decree, + _app->last_committed_decree() + 1); + + // move the `learn/` dir to working dir (`plog/`) to replace current log files to replay + error_code err = _private_log->reset_from( + _app->learn_dir(), + [](int log_length, mutation_ptr &mu) { return true; }, + [this](error_code err) { + tasking::enqueue(LPC_REPLICATION_ERROR, + &_tracker, + [this, err]() { handle_local_failure(err); }, + get_gpid().thread_hash()); + }); if (err != ERR_OK) { derror_replica("failed to reset this private log with logs in learn/ dir: {}", err); return err; } - // only the uncommitted logs will be replayed and applied into storage. + // only select uncommitted logs to be replayed and applied into storage. learn_state tmp_state; _private_log->get_learn_state(get_gpid(), _app->last_committed_decree() + 1, tmp_state); state.files = tmp_state.files; + step_back = true; } int64_t offset; @@ -1549,13 +1562,15 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) prepare_list plist(this, _app->last_committed_decree(), _options->max_mutation_count_in_prepare_list, - [this, duplicating](mutation_ptr &mu) { + [this, duplicating, step_back](mutation_ptr &mu) { if (mu->data.header.decree == _app->last_committed_decree() + 1) { // TODO: assign the returned error_code to err and check it _app->apply_mutation(mu); // appends logs-in-cache into plog to ensure them can be duplicated. - if (duplicating) { + // if current case is step back, it means the logs has been reserved + // through `reset_form` above + if (duplicating && !step_back) { _private_log->append( mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr); } @@ -1598,18 +1613,19 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) _potential_secondary_states.first_learn_start_decree = state.learn_start_decree; } - ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, " - "learn_duration = %" PRIu64 " ms, apply private log files done, " - "file_count = %d, first_learn_start_decree = %" PRId64 ", learn_start_decree = %" PRId64 - ", app_committed_decree = %" PRId64, - name(), - _potential_secondary_states.learning_version, - _config.primary.to_string(), - _potential_secondary_states.duration_ms(), - static_cast(state.files.size()), - _potential_secondary_states.first_learn_start_decree, - state.learn_start_decree, - _app->last_committed_decree()); + ddebug_replica("apply_learned_state_from_private_log[{}]: duplicating={}, step_back={}, " + "learnee = {}, learn_duration = {} ms, apply private log files done, file_count " + "={}, first_learn_start_decree ={}, learn_start_decree = {}, " + "app_committed_decree = {}", + _potential_secondary_states.learning_version, + duplicating, + step_back, + _config.primary.to_string(), + _potential_secondary_states.duration_ms(), + state.files.size(), + _potential_secondary_states.first_learn_start_decree, + state.learn_start_decree, + _app->last_committed_decree()); // apply in-buffer private logs if (err == ERR_OK) { @@ -1631,26 +1647,24 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) } if (state.to_decree_included > last_committed_decree()) { - ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, " - "learned_to_decree_included(%" PRId64 ") > last_committed_decree(%" PRId64 "), " - "commit to to_decree_included", - name(), - _potential_secondary_states.learning_version, - _config.primary.to_string(), - state.to_decree_included, - last_committed_decree()); + ddebug_replica("apply_learned_state_from_private_log[{}]: learnee ={}, " + "learned_to_decree_included({}) > last_committed_decree({}), commit to " + "to_decree_included", + _potential_secondary_states.learning_version, + _config.primary.to_string(), + state.to_decree_included, + last_committed_decree()); plist.commit(state.to_decree_included, COMMIT_TO_DECREE_SOFT); } - ddebug("%s: apply_learned_state_from_private_log[%016" PRIx64 "]: learnee = %s, " - "learn_duration = %" PRIu64 " ms, apply in-buffer private logs done, " - "replay_count = %d, app_committed_decree = %" PRId64, - name(), - _potential_secondary_states.learning_version, - _config.primary.to_string(), - _potential_secondary_states.duration_ms(), - replay_count, - _app->last_committed_decree()); + ddebug_replica(" apply_learned_state_from_private_log[{}]: learnee ={}, " + "learn_duration ={} ms, apply in-buffer private logs done, " + "replay_count ={}, app_committed_decree = {}", + _potential_secondary_states.learning_version, + _config.primary.to_string(), + _potential_secondary_states.duration_ms(), + replay_count, + _app->last_committed_decree()); } // awaits for unfinished mutation writes. diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp index bf45e372b4..9f5806df17 100644 --- a/src/replica/test/mutation_log_test.cpp +++ b/src/replica/test/mutation_log_test.cpp @@ -268,7 +268,10 @@ namespace replication { class mutation_log_test : public replica_test_base { public: - mutation_log_test() = default; + gpid _gpid; + +public: + mutation_log_test() : _gpid(_replica->get_gpid()) {} void SetUp() override { @@ -495,5 +498,98 @@ TEST_F(mutation_log_test, replay_start_decree) ASSERT_EQ(mlog->get_log_file_map().size(), 3); } +TEST_F(mutation_log_test, reset_from) +{ + std::vector expected; + { // writing logs + mutation_log_ptr mlog = + new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + for (int i = 0; i < 10; i++) { + mutation_ptr mu = create_test_mutation(2 + i, "hello!"); + expected.push_back(mu); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->flush(); + + ASSERT_TRUE(utils::filesystem::rename_path(_log_dir, _log_dir + ".tmp")); + } + + ASSERT_TRUE(utils::filesystem::directory_exists(_log_dir + ".tmp")); + ASSERT_FALSE(utils::filesystem::directory_exists(_log_dir)); + + // create another set of logs + mutation_log_ptr mlog = + new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + for (int i = 0; i < 1000; i++) { + mutation_ptr mu = create_test_mutation(2000 + i, "hello!"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->flush(); + + // reset from the tmp log dir. + std::vector actual; + auto err = mlog->reset_from(_log_dir + ".tmp", + [&](int, mutation_ptr &mu) -> bool { + actual.push_back(mu); + return true; + }, + [](error_code err) { ASSERT_EQ(err, ERR_OK); }); + ASSERT_EQ(err, ERR_OK); + ASSERT_EQ(actual.size(), expected.size()); + + // the tmp dir has been removed. + ASSERT_FALSE(utils::filesystem::directory_exists(_log_dir + ".tmp")); + ASSERT_TRUE(utils::filesystem::directory_exists(_log_dir)); +} + +// multi-threaded testing. ensure reset_from will wait until +// all previous writes complete. +TEST_F(mutation_log_test, reset_from_while_writing) +{ + std::vector expected; + { // writing logs + mutation_log_ptr mlog = + new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + for (int i = 0; i < 10; i++) { + mutation_ptr mu = create_test_mutation(2 + i, "hello!"); + expected.push_back(mu); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->flush(); + + ASSERT_TRUE(utils::filesystem::rename_path(_log_dir, _log_dir + ".test")); + } + + // create another set of logs + mutation_log_ptr mlog = + new mutation_log_private(_log_dir, 4, _gpid, _replica.get(), 1024, 512, 10000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + // given with a large number of mutation to ensure + // plog::reset_from will face many uncompleted writes. + for (int i = 0; i < 1000 * 100; i++) { + mutation_ptr mu = create_test_mutation(2000 + i, "hello!"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, mlog->tracker(), nullptr, 0); + } + + // reset from the tmp log dir. + std::vector actual; + auto err = mlog->reset_from(_log_dir + ".test", + [&](int, mutation_ptr &mu) -> bool { + actual.push_back(mu); + return true; + }, + [](error_code err) { ASSERT_EQ(err, ERR_OK); }); + ASSERT_EQ(err, ERR_OK); + + mlog->flush(); + ASSERT_EQ(actual.size(), expected.size()); +} } // namespace replication } // namespace dsn