diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp index 3d3eda94f7..6fed98132b 100644 --- a/src/common/fs_manager.cpp +++ b/src/common/fs_manager.cpp @@ -36,6 +36,7 @@ #include #include +#include #include #include @@ -44,8 +45,8 @@ #include "fmt/core.h" #include "fmt/ostream.h" #include "perf_counter/perf_counter.h" +#include "replica_admin_types.h" #include "runtime/api_layer1.h" -#include "runtime/rpc/rpc_address.h" #include "utils/fail_point.h" #include "utils/filesystem.h" #include "utils/fmt_logging.h" @@ -69,6 +70,19 @@ DSN_DEFINE_bool(replication, true, "true means ignore broken data disk when initialize"); +error_code disk_status_to_error_code(disk_status::type ds) +{ + switch (ds) { + case disk_status::SPACE_INSUFFICIENT: + return dsn::ERR_DISK_INSUFFICIENT; + case disk_status::IO_ERROR: + return dsn::ERR_DISK_IO_ERROR; + default: + CHECK_EQ(disk_status::NORMAL, ds); + return dsn::ERR_OK; + } +} + uint64_t dir_node::replicas_count() const { uint64_t sum = 0; @@ -339,16 +353,6 @@ void fs_manager::remove_replica(const gpid &pid) } } -bool fs_manager::for_each_dir_node(const std::function &func) const -{ - zauto_read_lock l(_lock); - for (auto &n : _dir_nodes) { - if (!func(*n)) - return false; - } - return true; -} - void fs_manager::update_disk_stat() { zauto_write_lock l(_lock); @@ -388,21 +392,25 @@ void fs_manager::update_disk_stat() void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag) { - zauto_write_lock l(_lock); std::string norm_path; utils::filesystem::get_normalized_path(data_dir, norm_path); - dir_node *n = new dir_node(tag, norm_path); - _dir_nodes.emplace_back(n); - LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag); + auto dn = std::make_shared(tag, norm_path); + + { + zauto_write_lock l(_lock); + _dir_nodes.emplace_back(dn); + } + LOG_INFO("add new data dir({}) and mark as tag({})", norm_path, tag); } -bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const +bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const { + std::string norm_path; + utils::filesystem::get_normalized_path(data_dir, norm_path); + zauto_read_lock l(_lock); - for (const auto &dir_node : _dir_nodes) { - std::string norm_path; - utils::filesystem::get_normalized_path(data_dir, norm_path); - if (dir_node->full_dir == norm_path || dir_node->tag == tag) { + for (const auto &dn : _dir_nodes) { + if (dn->full_dir == norm_path || dn->tag == tag) { return true; } } @@ -497,5 +505,103 @@ dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type, return child_dn; } +std::vector fs_manager::get_disk_infos(int app_id) const +{ + std::vector disk_infos; + zauto_read_lock l(_lock); + for (const auto &dn : _dir_nodes) { + disk_info di; + // Query all app info if 'app_id' is 0, which is not a valid app id. + if (app_id == 0) { + di.holding_primary_replicas = dn->holding_primary_replicas; + di.holding_secondary_replicas = dn->holding_secondary_replicas; + } else { + const auto &primary_replicas = dn->holding_primary_replicas.find(app_id); + if (primary_replicas != dn->holding_primary_replicas.end()) { + di.holding_primary_replicas[app_id] = primary_replicas->second; + } + + const auto &secondary_replicas = dn->holding_secondary_replicas.find(app_id); + if (secondary_replicas != dn->holding_secondary_replicas.end()) { + di.holding_secondary_replicas[app_id] = secondary_replicas->second; + } + } + di.tag = dn->tag; + di.full_dir = dn->full_dir; + di.disk_capacity_mb = dn->disk_capacity_mb; + di.disk_available_mb = dn->disk_available_mb; + + disk_infos.emplace_back(std::move(di)); + } + + return disk_infos; +} + +error_code fs_manager::validate_migrate_op(gpid pid, + const std::string &origin_disk, + const std::string &target_disk, + std::string &err_msg) const +{ + bool origin_disk_exist = false; + bool target_disk_exist = false; + zauto_read_lock l(_lock); + for (const auto &dn : _dir_nodes) { + // Check if the origin directory is valid. + if (dn->tag == origin_disk) { + CHECK_FALSE(origin_disk_exist); + if (!dn->has(pid)) { + err_msg = fmt::format( + "replica({}) doesn't exist on the origin disk({})", pid, origin_disk); + return ERR_OBJECT_NOT_FOUND; + } + + // It's OK to migrate a replica from a dir_node which is NORMAL or even + // SPACE_INSUFFICIENT, but not allowed when it's IO_ERROR. + if (dn->status == disk_status::IO_ERROR) { + err_msg = fmt::format( + "replica({}) exists on an IO-Error origin disk({})", pid, origin_disk); + return ERR_DISK_IO_ERROR; + } + + origin_disk_exist = true; + } + + // Check if the target directory is valid. + if (dn->tag == target_disk) { + CHECK_FALSE(target_disk_exist); + if (dn->has(pid)) { + err_msg = + fmt::format("replica({}) already exists on target disk({})", pid, target_disk); + return ERR_PATH_ALREADY_EXIST; + } + + // It's not allowed to migrate a replica to a dir_node which is either + // SPACE_INSUFFICIENT or IO_ERROR. + if (dn->status == disk_status::SPACE_INSUFFICIENT || + dn->status == disk_status::IO_ERROR) { + err_msg = fmt::format("replica({}) target disk({}) is {}", + pid, + origin_disk, + enum_to_string(dn->status)); + return disk_status_to_error_code(dn->status); + } + + target_disk_exist = true; + } + } + + if (!origin_disk_exist) { + err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk); + return ERR_OBJECT_NOT_FOUND; + } + + if (!target_disk_exist) { + err_msg = fmt::format("target disk({}) doesn't exist", target_disk); + return ERR_OBJECT_NOT_FOUND; + } + + return ERR_OK; +} + } // namespace replication } // namespace dsn diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h index e4c51667db..1073669878 100644 --- a/src/common/fs_manager.h +++ b/src/common/fs_manager.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -30,6 +29,7 @@ #include "common/replication_other_types.h" #include "metadata_types.h" #include "perf_counter/perf_counter_wrapper.h" +#include "utils/error_code.h" #include "utils/flags.h" #include "utils/string_view.h" #include "utils/zlocks.h" @@ -38,9 +38,12 @@ namespace dsn { class gpid; namespace replication { +class disk_info; DSN_DECLARE_int32(disk_min_available_space_ratio); +error_code disk_status_to_error_code(disk_status::type ds); + struct dir_node { public: @@ -116,16 +119,20 @@ class fs_manager gpid child_pid, const std::string &parent_dir); void remove_replica(const dsn::gpid &pid); - bool for_each_dir_node(const std::function &func) const; void update_disk_stat(); void add_new_dir_node(const std::string &data_dir, const std::string &tag); + bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const; const std::vector> &get_dir_nodes() const { zauto_read_lock l(_lock); return _dir_nodes; } - bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const; + error_code validate_migrate_op(gpid pid, + const std::string &origin_disk, + const std::string &target_disk, + std::string &err_msg) const; + std::vector get_disk_infos(int app_id) const; private: void reset_disk_stat() diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 2a8e477871..e0cff3868e 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -117,21 +117,6 @@ DSN_DEFINE_uint64( DSN_DECLARE_int32(max_mutation_count_in_prepare_list); DSN_DECLARE_int32(staleness_for_commit); -namespace { -error_code disk_status_to_error_code(disk_status::type ds) -{ - switch (ds) { - case disk_status::SPACE_INSUFFICIENT: - return dsn::ERR_DISK_INSUFFICIENT; - case disk_status::IO_ERROR: - return dsn::ERR_DISK_IO_ERROR; - default: - CHECK_EQ(disk_status::NORMAL, ds); - return dsn::ERR_OK; - } -} -} // anonymous namespace - void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) { _checker.only_one_thread_access(); diff --git a/src/replica/replica_disk_migrator.cpp b/src/replica/replica_disk_migrator.cpp index 1afc5c62e3..e1fecba808 100644 --- a/src/replica/replica_disk_migrator.cpp +++ b/src/replica/replica_disk_migrator.cpp @@ -19,10 +19,6 @@ #include #include -#include -#include -#include -#include #include "common/fs_manager.h" #include "common/gpid.h" @@ -126,56 +122,12 @@ bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc) return false; } - bool valid_origin_disk = false; - bool valid_target_disk = false; - // _dir_nodes: std::vector> - // TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR. - for (const auto &dir_node : _replica->get_replica_stub()->_fs_manager._dir_nodes) { - if (dir_node->tag == req.origin_disk) { - valid_origin_disk = true; - if (!dir_node->has(req.pid)) { - std::string err_msg = - fmt::format("Invalid replica(replica({}) doesn't exist on origin disk({}))", - req.pid, - req.origin_disk); - LOG_ERROR_PREFIX( - "received replica disk migrate request(origin={}, target={}), err = {}", - req.origin_disk, - req.target_disk, - err_msg); - resp.err = ERR_OBJECT_NOT_FOUND; - resp.__set_hint(err_msg); - return false; - } - } - - if (dir_node->tag == req.target_disk) { - valid_target_disk = true; - if (dir_node->has(get_gpid())) { - std::string err_msg = - fmt::format("Invalid replica(replica({}) has existed on target disk({}))", - req.pid, - req.target_disk); - LOG_ERROR_PREFIX( - "received replica disk migrate request(origin={}, target={}), err = {}", - req.origin_disk, - req.target_disk, - err_msg); - resp.err = ERR_PATH_ALREADY_EXIST; - resp.__set_hint(err_msg); - return false; - } - } - } - - if (!valid_origin_disk || !valid_target_disk) { - std::string invalid_disk_tag = !valid_origin_disk ? req.origin_disk : req.target_disk; - std::string err_msg = fmt::format("Invalid disk tag({} doesn't exist)", invalid_disk_tag); - LOG_ERROR_PREFIX("received replica disk migrate request(origin={}, target={}), err = {}", - req.origin_disk, - req.target_disk, - err_msg); - resp.err = ERR_OBJECT_NOT_FOUND; + std::string err_msg; + auto ec = _replica->get_replica_stub()->_fs_manager.validate_migrate_op( + req.pid, req.origin_disk, req.target_disk, err_msg); + if (ec != ERR_OK) { + LOG_ERROR_PREFIX(err_msg); + resp.err = ec; resp.__set_hint(err_msg); return false; } diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index dcacf4baf0..a3e16500a7 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -1105,34 +1105,10 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc) } } - for (const auto &dir_node : _fs_manager._dir_nodes) { - disk_info info; - // app_name empty means query all app replica_count - if (req.app_name.empty()) { - info.holding_primary_replicas = dir_node->holding_primary_replicas; - info.holding_secondary_replicas = dir_node->holding_secondary_replicas; - } else { - const auto &primary_iter = dir_node->holding_primary_replicas.find(app_id); - if (primary_iter != dir_node->holding_primary_replicas.end()) { - info.holding_primary_replicas[app_id] = primary_iter->second; - } - - const auto &secondary_iter = dir_node->holding_secondary_replicas.find(app_id); - if (secondary_iter != dir_node->holding_secondary_replicas.end()) { - info.holding_secondary_replicas[app_id] = secondary_iter->second; - } - } - info.tag = dir_node->tag; - info.full_dir = dir_node->full_dir; - info.disk_capacity_mb = dir_node->disk_capacity_mb; - info.disk_available_mb = dir_node->disk_available_mb; - - resp.disk_infos.emplace_back(info); - } - - resp.total_capacity_mb = _fs_manager._total_capacity_mb; - resp.total_available_mb = _fs_manager._total_available_mb; - + resp.disk_infos = _fs_manager.get_disk_infos(app_id); + // Get the statistics from fs_manager's metrics, they are thread-safe. + resp.total_capacity_mb = _fs_manager._counter_total_capacity_mb->get_integer_value(); + resp.total_available_mb = _fs_manager._counter_total_available_mb->get_integer_value(); resp.err = ERR_OK; } @@ -1203,11 +1179,12 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc) } for (auto i = 0; i < data_dir_tags.size(); ++i) { + // TODO(yingchun): move the following code to fs_manager. auto dir = data_dirs[i]; - if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) { + if (_fs_manager.is_dir_node_exist(dir, data_dir_tags[i])) { resp.err = ERR_NODE_ALREADY_EXIST; resp.__set_err_hint( - fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i])); + fmt::format("data_dir({}) tag({}) already exist", dir, data_dir_tags[i])); return; } diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp index ec4b28a737..1954f29b5a 100644 --- a/src/utils/filesystem.cpp +++ b/src/utils/filesystem.cpp @@ -94,6 +94,7 @@ static inline int get_stat_internal(const std::string &npath, struct stat_ &st) return err; } +// TODO(yingchun): remove the return value because it's always 0. int get_normalized_path(const std::string &path, std::string &npath) { char sep;