From 7f22ed2e1125b0d0a70293e26c103be3e76fabd5 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Tue, 28 Feb 2023 23:52:05 +0800 Subject: [PATCH] refactor: Move some functions from 'replica' to 'replica_stub' --- src/replica/disk_cleaner.cpp | 13 +- src/replica/disk_cleaner.h | 2 + src/replica/replica.h | 16 --- src/replica/replica_config.cpp | 23 ++-- src/replica/replica_init.cpp | 132 ------------------- src/replica/replica_stub.cpp | 189 +++++++++++++++++++++------ src/replica/replica_stub.h | 12 ++ src/replica/replication_app_base.cpp | 18 +-- src/replica/test/replica_test.cpp | 10 +- 9 files changed, 198 insertions(+), 217 deletions(-) diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp index 7c624036fd..81ba82d824 100644 --- a/src/replica/disk_cleaner.cpp +++ b/src/replica/disk_cleaner.cpp @@ -21,7 +21,7 @@ #include "utils/filesystem.h" #include "utils/fmt_logging.h" #include "runtime/api_layer1.h" - +#include #include "disk_cleaner.h" namespace dsn { @@ -125,5 +125,16 @@ error_s disk_remove_useless_dirs(const std::vector &data_dirs, } return error_s::ok(); } + +void move_to_err_path(const std::string &path, const std::string &log_prefix) +{ + const std::string new_path = fmt::format("{}.{}{}", path, dsn_now_us(), kFolderSuffixErr); + CHECK(dsn::utils::filesystem::rename_path(path, new_path), + "{}: failed to move directory from '{}' to '{}'", + log_prefix, + path, + new_path); + LOG_WARNING("{}: succeed to move directory from '{}' to '{}'", log_prefix, path, new_path); +} } // namespace replication } // namespace dsn diff --git a/src/replica/disk_cleaner.h b/src/replica/disk_cleaner.h index 86d3f82ccb..dbcc0a8622 100644 --- a/src/replica/disk_cleaner.h +++ b/src/replica/disk_cleaner.h @@ -69,5 +69,7 @@ inline bool is_data_dir_invalid(const std::string &dir) const std::string folder_suffix = dir.substr(dir.length() - 4); return is_data_dir_removable(dir) || folder_suffix == kFolderSuffixBak; } + +void move_to_err_path(const std::string &path, const std::string &log_prefix); } // namespace replication } // namespace dsn diff --git a/src/replica/replica.h b/src/replica/replica.h index 20c9877dd0..6d23c54f5f 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -125,18 +125,6 @@ class replica : public serverlet, public ref_counter, public replica_ba public: ~replica(void); - // - // routines for replica stub - // - static replica *load(replica_stub *stub, const char *dir); - // {parent_dir} is used in partition split for get_child_dir in replica_stub - static replica *newr(replica_stub *stub, - gpid gpid, - const app_info &app, - bool restore_if_necessary, - bool is_duplication_follower, - const std::string &parent_dir = ""); - // return true when the mutation is valid for the current replica bool replay_mutation(mutation_ptr &mu, bool is_private); void reset_prepare_list_after_replay(); @@ -491,10 +479,6 @@ class replica : public serverlet, public ref_counter, public replica_ba // path = "" means using the default directory (`_dir`/.app_info) error_code store_app_info(app_info &info, const std::string &path = ""); - // clear replica if open failed - static replica * - clear_on_failure(replica_stub *stub, replica *rep, const std::string &path, const gpid &pid); - void update_app_max_replica_count(int32_t max_replica_count); void update_app_name(const std::string &app_name); diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index cd463210bf..10eaf4f0bc 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -634,13 +634,14 @@ bool replica::update_local_configuration(const replica_configuration &config, partition_status::type old_status = status(); ballot old_ballot = get_ballot(); - // skip unncessary configuration change - if (old_status == config.status && old_ballot == config.ballot) + // skip unnecessary configuration change + if (old_status == config.status && old_ballot == config.ballot) { return true; + } // skip invalid change // but do not disable transitions to partition_status::PS_ERROR as errors - // must be handled immmediately + // must be handled immediately switch (old_status) { case partition_status::PS_ERROR: { LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed", @@ -716,8 +717,7 @@ bool replica::update_local_configuration(const replica_configuration &config, break; } - bool r = false; - uint64_t oldTs = _last_config_change_time_ms; + uint64_t old_ts = _last_config_change_time_ms; _config = config; // we should durable the new ballot to prevent the inconsistent state if (_config.ballot > old_ballot) { @@ -827,8 +827,8 @@ bool replica::update_local_configuration(const replica_configuration &config, _prepare_list->truncate(_app->last_committed_decree()); // using force cleanup now as all tasks must be done already - r = _potential_secondary_states.cleanup(true); - CHECK(r, "{}: potential secondary context cleanup failed", name()); + CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true), + "potential secondary context cleanup failed"); check_state_completeness(); break; @@ -840,8 +840,8 @@ bool replica::update_local_configuration(const replica_configuration &config, _prepare_list->reset(_app->last_committed_decree()); _potential_secondary_states.cleanup(false); // => do this in close as it may block - // r = _potential_secondary_states.cleanup(true); - // CHECK(r, "{}: potential secondary context cleanup failed", name()); + // CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true), + // "potential secondary context cleanup failed"); break; default: CHECK(false, "invalid execution path"); @@ -942,7 +942,7 @@ bool replica::update_local_configuration(const replica_configuration &config, _prepare_list->last_committed_decree(), _app->last_committed_decree(), _app->last_durable_decree(), - _last_config_change_time_ms - oldTs, + _last_config_change_time_ms - old_ts, boost::lexical_cast(_config)); if (status() != old_status) { @@ -985,8 +985,9 @@ bool replica::update_local_configuration(const replica_configuration &config, bool replica::update_local_configuration_with_no_ballot_change(partition_status::type s) { - if (status() == s) + if (status() == s) { return false; + } auto config = _config; config.status = s; diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp index b7e932e6c2..637f2c1301 100644 --- a/src/replica/replica_init.cpp +++ b/src/replica/replica_init.cpp @@ -76,61 +76,6 @@ error_code replica::initialize_on_new() return init_app_and_prepare_list(true); } -/*static*/ replica *replica::newr(replica_stub *stub, - gpid gpid, - const app_info &app, - bool restore_if_necessary, - bool is_duplication_follower, - const std::string &parent_dir) -{ - std::string dir; - if (parent_dir.empty()) { - dir = stub->get_replica_dir(app.app_type.c_str(), gpid); - } else { - dir = stub->get_child_dir(app.app_type.c_str(), gpid, parent_dir); - } - replica *rep = - new replica(stub, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower); - error_code err; - if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { - LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err.to_string()); - return clear_on_failure(stub, rep, dir, gpid); - } - - if (is_duplication_follower && - (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) { - LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check " - "previous detail error log", - rep->name(), - err.to_string()); - return clear_on_failure(stub, rep, dir, gpid); - } - - err = rep->initialize_on_new(); - if (err == ERR_OK) { - LOG_DEBUG("{}: new replica succeed", rep->name()); - return rep; - } else { - LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err.to_string()); - return clear_on_failure(stub, rep, dir, gpid); - } -} - -/* static */ replica *replica::clear_on_failure(replica_stub *stub, - replica *rep, - const std::string &path, - const gpid &pid) -{ - rep->close(); - delete rep; - rep = nullptr; - - // clear work on failure - utils::filesystem::remove_path(path); - stub->_fs_manager.remove_replica(pid); - return nullptr; -} - error_code replica::initialize_on_load() { LOG_INFO_PREFIX("initialize replica on load, dir = {}", _dir); @@ -143,83 +88,6 @@ error_code replica::initialize_on_load() return init_app_and_prepare_list(false); } -/*static*/ replica *replica::load(replica_stub *stub, const char *dir) -{ - FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; }); - - char splitters[] = {'\\', '/', 0}; - std::string name = utils::get_last_component(std::string(dir), splitters); - if (name == "") { - LOG_ERROR("invalid replica dir {}", dir); - return nullptr; - } - - char app_type[128]; - int32_t app_id, pidx; - if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { - LOG_ERROR("invalid replica dir {}", dir); - return nullptr; - } - - gpid pid(app_id, pidx); - if (!utils::filesystem::directory_exists(dir)) { - LOG_ERROR("replica dir {} not exist", dir); - return nullptr; - } - - dsn::app_info info; - replica_app_info info2(&info); - std::string path = utils::filesystem::path_combine(dir, kAppInfo); - auto err = info2.load(path); - if (ERR_OK != err) { - LOG_ERROR("load app-info from {} failed, err = {}", path, err); - return nullptr; - } - - if (info.app_type != app_type) { - LOG_ERROR("unmatched app type {} for {}", info.app_type, path); - return nullptr; - } - - if (info.partition_count < pidx) { - LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, " - "ignore it", - pid, - info.partition_count); - return nullptr; - } - - replica *rep = new replica(stub, pid, info, dir, false); - - err = rep->initialize_on_load(); - if (err == ERR_OK) { - LOG_INFO("{}: load replica succeed", rep->name()); - return rep; - } else { - LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err); - rep->close(); - delete rep; - rep = nullptr; - - // clear work on failure - if (dsn::utils::filesystem::directory_exists(dir)) { - char rename_dir[1024]; - sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us()); - CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir), - "load_replica: failed to move directory '{}' to '{}'", - dir, - rename_dir); - LOG_WARNING("load_replica: replica_dir_op succeed to move directory '{}' to '{}'", - dir, - rename_dir); - stub->_counter_replicas_recent_replica_move_error_count->increment(); - stub->_fs_manager.remove_replica(pid); - } - - return nullptr; - } -} - decree replica::get_replay_start_decree() { decree replay_start_decree = _app->last_committed_decree(); diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index e381128675..13584f5175 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -55,6 +55,7 @@ #include #include #include "utils/fmt_logging.h" +#include "replica/duplication/replica_follower.h" #ifdef DSN_ENABLE_GPERF #include #elif defined(DSN_USE_JEMALLOC) @@ -610,7 +611,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f [this, dir, &rps, &rps_lock] { LOG_INFO("process dir {}", dir); - auto r = replica::load(this, dir.c_str()); + auto r = load_replica(dir.c_str()); if (r != nullptr) { LOG_INFO("{}@{}: load replica '{}' success, = <{}, {}>, last_prepared_decree = {}", @@ -682,17 +683,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f // TODO: checkpoint latest state and update on meta server so learning is cheaper for (auto it = rps.begin(); it != rps.end(); ++it) { it->second->close(); - // move to '.err' directory - const char *dir = it->second->dir().c_str(); - char rename_dir[1024]; - sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us()); - CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir), - "init_replica: failed to move directory '{}' to '{}'", - dir, - rename_dir); - LOG_WARNING("init_replica: replica_dir_op succeed to move directory '{}' to '{}'", - dir, - rename_dir); + move_to_err_path(it->second->dir(), "initialize replica"); _counter_replicas_recent_replica_move_error_count->increment(); } rps.clear(); @@ -2059,7 +2050,7 @@ void replica_stub::open_replica( _primary_address_str, group_check ? "with" : "without", dir); - rep = replica::load(this, dir.c_str()); + rep = load_replica(dir.c_str()); // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk // migration @@ -2082,7 +2073,7 @@ void replica_stub::open_replica( boost::replace_first( origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, ""); dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir); - rep = replica::load(this, origin_dir.c_str()); + rep = load_replica(origin_dir.c_str()); FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {}); } @@ -2132,7 +2123,7 @@ void replica_stub::open_replica( "remove useless directory({}) failed", dir); } - rep = replica::newr(this, id, app, restore_if_necessary, is_duplication_follower); + rep = new_replica(id, app, restore_if_necessary, is_duplication_follower); } if (rep == nullptr) { @@ -2177,6 +2168,127 @@ void replica_stub::open_replica( } } +replica *replica_stub::new_replica(gpid gpid, + const app_info &app, + bool restore_if_necessary, + bool is_duplication_follower, + const std::string &parent_dir) +{ + std::string dir; + if (parent_dir.empty()) { + dir = get_replica_dir(app.app_type.c_str(), gpid); + } else { + dir = get_child_dir(app.app_type.c_str(), gpid, parent_dir); + } + auto *rep = + new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower); + error_code err; + if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { + LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err); + clear_on_failure(rep, dir, gpid); + return nullptr; + } + + if (is_duplication_follower && + (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) { + LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check " + "previous detail error log", + rep->name(), + err); + clear_on_failure(rep, dir, gpid); + return nullptr; + } + + err = rep->initialize_on_new(); + if (err != ERR_OK) { + LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err); + clear_on_failure(rep, dir, gpid); + return nullptr; + } + + LOG_DEBUG("{}: new replica succeed", rep->name()); + return rep; +} + +replica *replica_stub::load_replica(const char *dir) +{ + FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; }); + + char splitters[] = {'\\', '/', 0}; + std::string name = utils::get_last_component(std::string(dir), splitters); + if (name.empty()) { + LOG_ERROR("invalid replica dir {}", dir); + return nullptr; + } + + char app_type[128]; + int32_t app_id, pidx; + if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { + LOG_ERROR("invalid replica dir {}", dir); + return nullptr; + } + + gpid pid(app_id, pidx); + if (!utils::filesystem::directory_exists(dir)) { + LOG_ERROR("replica dir {} not exist", dir); + return nullptr; + } + + dsn::app_info info; + replica_app_info info2(&info); + std::string path = utils::filesystem::path_combine(dir, replica::kAppInfo); + auto err = info2.load(path); + if (ERR_OK != err) { + LOG_ERROR("load app-info from {} failed, err = {}", path, err); + return nullptr; + } + + if (info.app_type != app_type) { + LOG_ERROR("unmatched app type {} for {}", info.app_type, path); + return nullptr; + } + + if (info.partition_count < pidx) { + LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, " + "ignore it", + pid, + info.partition_count); + return nullptr; + } + + auto *rep = new replica(this, pid, info, dir, false); + err = rep->initialize_on_load(); + if (err != ERR_OK) { + LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err); + rep->close(); + delete rep; + rep = nullptr; + + // clear work on failure + if (dsn::utils::filesystem::directory_exists(dir)) { + move_to_err_path(dir, "load replica"); + _counter_replicas_recent_replica_move_error_count->increment(); + _fs_manager.remove_replica(pid); + } + + return nullptr; + } + + LOG_INFO("{}: load replica succeed", rep->name()); + return rep; +} + +void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid) +{ + rep->close(); + delete rep; + rep = nullptr; + + // clear work on failure + utils::filesystem::remove_path(path); + _fs_manager.remove_replica(pid); +} + task_ptr replica_stub::begin_close_replica(replica_ptr r) { CHECK(r->status() == partition_status::PS_ERROR || @@ -2191,32 +2303,31 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r) gpid id = r->get_gpid(); zauto_write_lock l(_replicas_lock); + if (_replicas.erase(id) == 0) { + return nullptr; + } - if (_replicas.erase(id) > 0) { - _counter_replicas_count->decrement(); - - int delay_ms = 0; - if (r->status() == partition_status::PS_INACTIVE) { - delay_ms = FLAGS_gc_memory_replica_interval_ms; - LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE", - r->name(), - delay_ms); - } + _counter_replicas_count->decrement(); - app_info a_info = *(r->get_app_info()); - replica_info r_info; - get_replica_info(r_info, r); - task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA, - &_tracker, - [=]() { close_replica(r); }, - 0, - std::chrono::milliseconds(delay_ms)); - _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info)); - _counter_replicas_closing_count->increment(); - return task; - } else { - return nullptr; + int delay_ms = 0; + if (r->status() == partition_status::PS_INACTIVE) { + delay_ms = FLAGS_gc_memory_replica_interval_ms; + LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE", + r->name(), + delay_ms); } + + app_info a_info = *(r->get_app_info()); + replica_info r_info; + get_replica_info(r_info, r); + task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA, + &_tracker, + [=]() { close_replica(r); }, + 0, + std::chrono::milliseconds(delay_ms)); + _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info)); + _counter_replicas_closing_count->increment(); + return task; } void replica_stub::close_replica(replica_ptr r) @@ -2865,7 +2976,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, LOG_WARNING("failed create child replica({}) because it is under close", child_pid); return nullptr; } else { - replica *rep = replica::newr(this, child_pid, *app, false, false, parent_dir); + replica *rep = new_replica(child_pid, *app, false, false, parent_dir); if (rep != nullptr) { auto pr = _replicas.insert(replicas::value_type(child_pid, rep)); CHECK(pr.second, "child replica {} has been existed", rep->name()); diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 693ed6a274..648e0c1c10 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -265,6 +265,17 @@ class replica_stub : public serverlet, public ref_counter gpid id, const std::shared_ptr &req, const std::shared_ptr &req2); + // Create a new replica according to the parameters. + // 'parent_dir' is used in partition split for get_child_dir(). + replica *new_replica(gpid gpid, + const app_info &app, + bool restore_if_necessary, + bool is_duplication_follower, + const std::string &parent_dir = ""); + // Load an existing replica from 'dir'. + replica *load_replica(const char *dir); + // Clean up the memory state and on disk data if creating replica failed. + void clear_on_failure(replica *rep, const std::string &path, const gpid &pid); task_ptr begin_close_replica(replica_ptr r); void close_replica(replica_ptr r); void notify_replica_state_update(const replica_configuration &config, bool is_closing); @@ -338,6 +349,7 @@ class replica_stub : public serverlet, public ref_counter friend class replica_follower; friend class replica_follower_test; friend class replica_http_service_test; + FRIEND_TEST(replica_test, test_clear_on_failer); typedef std::unordered_map opening_replicas; typedef std::unordered_map> diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index 2817322464..e08fdd77d5 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -413,15 +413,15 @@ error_code replication_app_base::apply_mutation(const mutation *mu) if (perror != 0) { LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror); - // for normal write requests, if got rocksdb error, this replica will be set error and evoke - // learn for ingestion requests, should not do as normal write requests, there are two - // reasons: - // 1. all ingestion errors should be handled by meta server in function - // `on_partition_ingestion_reply`, rocksdb error will be returned to meta server in - // structure `ingestion_response`, not in this function - // 2. if replica apply ingestion mutation during learn, it may got error from rocksdb, - // because the external sst files may not exist, in this case, we won't consider it as an - // error + // For normal write requests, if got rocksdb error, this replica will be set error and evoke + // learn. + // For ingestion requests, should not do as normal write requests, there are two reasons: + // 1. All ingestion errors should be handled by meta server in function + // `on_partition_ingestion_reply`, rocksdb error will be returned to meta server in + // structure `ingestion_response`, not in this function. + // 2. If replica apply ingestion mutation during learn, it may get error from rocksdb, + // because the external sst files may not exist, in this case, we won't consider it as + // an error. if (!has_ingestion_request) { return ERR_LOCAL_APP_FAILURE; } diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index 545dff489e..f1155155dd 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -168,14 +168,6 @@ class replica_test : public replica_test_base bool is_checkpointing() { return _mock_replica->_is_manual_emergency_checkpointing; } - replica *call_clear_on_failure(replica_stub *stub, - replica *rep, - const std::string &path, - const gpid &gpid) - { - return replica::clear_on_failure(stub, rep, path, gpid); - } - bool has_gpid(gpid &gpid) const { for (const auto &node : stub->_fs_manager._dir_nodes) { @@ -445,7 +437,7 @@ TEST_F(replica_test, test_clear_on_failer) ASSERT_TRUE(dsn::utils::filesystem::path_exists(path)); ASSERT_TRUE(has_gpid(pid)); - ASSERT_FALSE(call_clear_on_failure(stub.get(), rep, path, pid)); + stub->clear_on_failure(rep, path, pid); ASSERT_FALSE(dsn::utils::filesystem::path_exists(path)); ASSERT_FALSE(has_gpid(pid));