diff --git a/idl/metadata.thrift b/idl/metadata.thrift index 7c8448bee3..dd21901dec 100644 --- a/idl/metadata.thrift +++ b/idl/metadata.thrift @@ -58,7 +58,8 @@ enum split_status enum disk_status { NORMAL = 0, - SPACE_INSUFFICIENT + SPACE_INSUFFICIENT, + IO_ERROR } enum manual_compaction_status diff --git a/src/aio/native_linux_aio_provider.cpp b/src/aio/native_linux_aio_provider.cpp index 55e8961d2e..ff19f68ac5 100644 --- a/src/aio/native_linux_aio_provider.cpp +++ b/src/aio/native_linux_aio_provider.cpp @@ -52,7 +52,8 @@ linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, int { auto fd = ::open(file_name, flag, pmode); if (fd == DSN_INVALID_FILE_HANDLE) { - LOG_ERROR("create file failed, err = {}", utils::safe_strerror(errno)); + LOG_ERROR( + "create file failed, file = {}, err = {}", file_name, utils::safe_strerror(errno)); } return linux_fd_t(fd); } diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp index 2ba683da4f..5341534672 100644 --- a/src/common/fs_manager.cpp +++ b/src/common/fs_manager.cpp @@ -61,6 +61,11 @@ DSN_DEFINE_int32(replication, "space insufficient"); DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE); +DSN_DEFINE_bool(replication, + ignore_broken_disk, + true, + "true means ignore broken data disk when initialize"); + uint64_t dir_node::replicas_count() const { uint64_t sum = 0; @@ -79,6 +84,11 @@ uint64_t dir_node::replicas_count(app_id id) const return iter->second.size(); } +std::string dir_node::replica_dir(const char *app_type, const dsn::gpid &pid) const +{ + return utils::filesystem::path_combine(full_dir, fmt::format("{}.{}", pid, app_type)); +} + bool dir_node::has(const gpid &pid) const { auto iter = holding_replicas.find(pid.get_app_id()); @@ -97,30 +107,33 @@ uint64_t dir_node::remove(const gpid &pid) return iter->second.erase(pid); } -bool dir_node::update_disk_stat(const bool update_disk_status) +void dir_node::update_disk_stat() { - FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) { return false; }); - dsn::utils::filesystem::disk_space_info info; - if (!dsn::utils::filesystem::get_disk_space_info(full_dir, info)) { - LOG_ERROR("update disk space failed: dir = {}", full_dir); - return false; + FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) { return; }); + + // Get disk space info. + dsn::utils::filesystem::disk_space_info dsi; + if (!dsn::utils::filesystem::get_disk_space_info(full_dir, dsi)) { + // TODO(yingchun): it may encounter some IO errors when get_disk_space_info() failed, deal + // with it. + LOG_ERROR("update disk space failed, dir = {}", full_dir); + return; } - // update disk space info - disk_capacity_mb = info.capacity / 1024 / 1024; - disk_available_mb = info.available / 1024 / 1024; + + // Update in-memory disk space info. + disk_capacity_mb = dsi.capacity >> 20; + disk_available_mb = dsi.available >> 20; disk_available_ratio = static_cast( disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb)); - if (!update_disk_status) { - LOG_INFO("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, " - "available_ratio = {}%", - full_dir, - disk_capacity_mb, - disk_available_mb, - disk_available_ratio); - return false; + // Update status. + // If the disk is already in IO_ERROR status, it will not change to other status. + if (status == disk_status::IO_ERROR) { + return; } - auto old_status = status; + + // It's able to change status from NORMAL to SPACE_INSUFFICIENT, and vice versa. + disk_status::type old_status = status; auto new_status = disk_available_ratio < FLAGS_disk_min_available_space_ratio ? disk_status::SPACE_INSUFFICIENT : disk_status::NORMAL; @@ -134,7 +147,6 @@ bool dir_node::update_disk_stat(const bool update_disk_status) disk_available_mb, disk_available_ratio, enum_to_string(status)); - return (old_status != new_status); } fs_manager::fs_manager() @@ -186,31 +198,51 @@ void fs_manager::initialize(const std::vector &data_dirs, const std::vector &data_dir_tags) { CHECK_EQ(data_dirs.size(), data_dir_tags.size()); - for (unsigned i = 0; i < data_dirs.size(); ++i) { + + // Skip the data directories which are broken. + std::vector> dir_nodes; + for (auto i = 0; i < data_dir_tags.size(); ++i) { + const auto &dir_tag = data_dir_tags[i]; + const auto &dir = data_dirs[i]; + + // Check the status of this directory. + std::string cdir; + std::string err_msg; + disk_status::type status = disk_status::NORMAL; + if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) || + !utils::filesystem::check_dir_rw(dir, err_msg))) { + if (FLAGS_ignore_broken_disk) { + LOG_ERROR("data dir({}) is broken, ignore it, error: {}", dir, err_msg); + } else { + CHECK(false, err_msg); + } + status = disk_status::IO_ERROR; + } + + // Normalize the data directories. std::string norm_path; - utils::filesystem::get_normalized_path(data_dirs[i], norm_path); - dir_node *n = new dir_node(data_dir_tags[i], norm_path); - _dir_nodes.emplace_back(n); - LOG_INFO( - "{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]); + utils::filesystem::get_normalized_path(cdir, norm_path); + + // Create and add this dir_node. + auto dn = std::make_shared(dir_tag, norm_path, 0, 0, 0, status); + LOG_INFO("mark data dir({}) as tag({}) with status({})", + norm_path, + dir_tag, + enum_to_string(status)); + dir_nodes.emplace_back(dn); + } + CHECK_FALSE(dir_nodes.empty()); + + // Update the memory state. + { + zauto_read_lock l(_lock); + _dir_nodes.swap(dir_nodes); } - _available_data_dirs = data_dirs; // Update the disk statistics. update_disk_stat(); } -dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag) -{ - dir_node *n = get_dir_node(dir); - if (nullptr == n) { - return dsn::ERR_OBJECT_NOT_FOUND; - } else { - tag = n->tag; - return dsn::ERR_OK; - } -} - void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir) { const auto &dn = get_dir_node(pid_dir); @@ -235,44 +267,61 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir) LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag); } -void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir) +dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const { - char buffer[256]; - sprintf(buffer, "%d.%d.%s", pid.get_app_id(), pid.get_partition_index(), type.c_str()); - - zauto_write_lock l(_lock); - dir_node *selected = nullptr; - - unsigned least_app_replicas_count = 0; - unsigned least_total_replicas_count = 0; - - for (auto &n : _dir_nodes) { - CHECK(!n->has(pid), "gpid({}) already in dir_node({})", pid, n->tag); - unsigned app_replicas = n->replicas_count(pid.get_app_id()); - unsigned total_replicas = n->replicas_count(); - - if (selected == nullptr || least_app_replicas_count > app_replicas) { - least_app_replicas_count = app_replicas; - least_total_replicas_count = total_replicas; - selected = n.get(); - } else if (least_app_replicas_count == app_replicas && - least_total_replicas_count > total_replicas) { - least_total_replicas_count = total_replicas; - selected = n.get(); + uint64_t least_app_replicas_count = 0; + uint64_t least_total_replicas_count = 0; + { + zauto_write_lock l(_lock); + // Try to find the dir_node with least replica count. + for (const auto &dn : _dir_nodes) { + // Do not allocate new replica on dir_node which is not NORMAL. + if (dn->status != disk_status::NORMAL) { + continue; + } + CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag); + uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id()); + uint64_t total_replicas_count = dn->replicas_count(); + + if (selected == nullptr || least_app_replicas_count > app_replicas_count) { + least_app_replicas_count = app_replicas_count; + least_total_replicas_count = total_replicas_count; + selected = dn.get(); + } else if (least_app_replicas_count == app_replicas_count && + least_total_replicas_count > total_replicas_count) { + least_total_replicas_count = total_replicas_count; + selected = dn.get(); + } } } + if (selected) { + LOG_INFO( + "{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally", + dsn_primary_address(), + pid, + selected->tag, + least_app_replicas_count, + least_total_replicas_count); + } + return selected; +} - LOG_INFO( - "{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally", - dsn_primary_address(), - pid, - selected->tag, - least_app_replicas_count, - least_total_replicas_count); - - selected->holding_replicas[pid.get_app_id()].emplace(pid); - dir = utils::filesystem::path_combine(selected->full_dir, buffer); +void fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn, + const dsn::gpid &pid) const +{ + bool dn_found = false; + zauto_write_lock l(_lock); + for (const auto &dn : _dir_nodes) { + CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag); + if (dn.get() == specified_dn) { + dn_found = true; + } + } + CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag); + const auto dir = specified_dn->replica_dir("replica", pid); + CHECK_TRUE(dsn::utils::filesystem::create_directory(dir)); + specified_dn->holding_replicas[pid.get_app_id()].emplace(pid); } void fs_manager::remove_replica(const gpid &pid) @@ -293,27 +342,16 @@ void fs_manager::remove_replica(const gpid &pid) } } -bool fs_manager::for_each_dir_node(const std::function &func) const +void fs_manager::update_disk_stat() { zauto_read_lock l(_lock); - for (auto &n : _dir_nodes) { - if (!func(*n)) - return false; - } - return true; -} - -void fs_manager::update_disk_stat(bool check_status_changed) -{ reset_disk_stat(); - for (auto &dir_node : _dir_nodes) { - if (dir_node->update_disk_stat(check_status_changed)) { - _status_updated_dir_nodes.emplace_back(dir_node); - } - _total_capacity_mb += dir_node->disk_capacity_mb; - _total_available_mb += dir_node->disk_available_mb; - _min_available_ratio = std::min(dir_node->disk_available_ratio, _min_available_ratio); - _max_available_ratio = std::max(dir_node->disk_available_ratio, _max_available_ratio); + for (auto &dn : _dir_nodes) { + dn->update_disk_stat(); + _total_capacity_mb += dn->disk_capacity_mb; + _total_available_mb += dn->disk_available_mb; + _min_available_ratio = std::min(dn->disk_available_ratio, _min_available_ratio); + _max_available_ratio = std::max(dn->disk_available_ratio, _max_available_ratio); } _total_available_ratio = static_cast( _total_capacity_mb == 0 ? 0 : std::round(_total_available_mb * 100.0 / _total_capacity_mb)); @@ -336,27 +374,178 @@ void fs_manager::update_disk_stat(bool check_status_changed) void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag) { - zauto_write_lock l(_lock); std::string norm_path; utils::filesystem::get_normalized_path(data_dir, norm_path); - dir_node *n = new dir_node(tag, norm_path); - _dir_nodes.emplace_back(n); - _available_data_dirs.emplace_back(data_dir); - LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag); + + { + zauto_write_lock l(_lock); + auto dn = std::make_shared(tag, norm_path); + _dir_nodes.emplace_back(dn); + } + LOG_INFO("mark data dir({}) as tag({})", norm_path, tag); } -bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const +bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const { + std::string norm_path; + utils::filesystem::get_normalized_path(data_dir, norm_path); + zauto_read_lock l(_lock); - for (const auto &dir_node : _dir_nodes) { - std::string norm_path; - utils::filesystem::get_normalized_path(data_dir, norm_path); - if (dir_node->full_dir == norm_path || dir_node->tag == tag) { + for (const auto &dn : _dir_nodes) { + if (dn->full_dir == norm_path || dn->tag == tag) { return true; } } return false; } +dir_node *fs_manager::get_replica_dir(const char *app_type, gpid id, bool create_new) +{ + std::string replica_dir; + dir_node *replica_dn = nullptr; + // Try to find the replica instance directory from all available data directories. + { + zauto_read_lock l(_lock); + for (const auto &dn : _dir_nodes) { + // Skip IO error dir_node. + if (dn->status == disk_status::IO_ERROR) { + continue; + } + const auto dir = dn->replica_dir(app_type, id); + if (utils::filesystem::directory_exists(dir)) { + // Check if there are duplicate replica instance directories. + CHECK(replica_dn == nullptr, "replica dir conflict: {} <--> {}", dir, replica_dir); + replica_dir = dir; + replica_dn = dn.get(); + } + } + } + + if (replica_dn != nullptr || !create_new) { + return replica_dn; + } + + // Find a dir_node for the new replica. + replica_dn = find_best_dir_for_new_replica(id); + if (replica_dn == nullptr) { + return nullptr; + } + + const auto dir = replica_dn->replica_dir(app_type, id); + if (!dsn::utils::filesystem::create_directory(dir)) { + LOG_ERROR("create replica directory({}) failed", dir); + return nullptr; + } + + replica_dn->holding_replicas[id.get_app_id()].emplace(id); + return replica_dn; +} + +dir_node * +fs_manager::get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir) +{ + dir_node *child_dn = nullptr; + std::string child_dir; + { + zauto_read_lock l(_lock); + for (const auto &dn : _dir_nodes) { + // Skip non-available dir_node. + if (dn->status != disk_status::NORMAL) { + continue; + } + child_dir = dn->replica_dir(app_type, child_pid); + // = /. + // check if 's is equal to + // TODO(yingchun): use a function instead. + if (parent_dir.substr(0, dn->full_dir.size() + 1) == dn->full_dir + "/") { + child_dn = dn.get(); + break; + } + } + } + CHECK_NOTNULL(child_dn, "can not find parent_dir {} in data_dirs", parent_dir); + if (!dsn::utils::filesystem::create_directory(child_dir)) { + LOG_ERROR("create child replica directory({}) failed", child_dir); + return nullptr; + } + add_replica(child_pid, child_dir); + return child_dn; +} + +std::vector fs_manager::get_disk_infos(int app_id) const +{ + std::vector disk_infos; + zauto_read_lock l(_lock); + for (const auto &dn : _dir_nodes) { + disk_info di; + // Query all app info if 'app_id' is 0, which is not a valid app id. + if (app_id == 0) { + di.holding_primary_replicas = dn->holding_primary_replicas; + di.holding_secondary_replicas = dn->holding_secondary_replicas; + } else { + const auto &primary_iter = dn->holding_primary_replicas.find(app_id); + if (primary_iter != dn->holding_primary_replicas.end()) { + di.holding_primary_replicas[app_id] = primary_iter->second; + } + + const auto &secondary_iter = dn->holding_secondary_replicas.find(app_id); + if (secondary_iter != dn->holding_secondary_replicas.end()) { + di.holding_secondary_replicas[app_id] = secondary_iter->second; + } + } + di.tag = dn->tag; + di.full_dir = dn->full_dir; + di.disk_capacity_mb = dn->disk_capacity_mb; + di.disk_available_mb = dn->disk_available_mb; + + disk_infos.emplace_back(std::move(di)); + } + + return disk_infos; +} + +fs_manager::disk_migrate_validation +fs_manager::validate_migrate_task(gpid pid, + const std::string &origin_disk, + const std::string &target_disk, + std::string &err_msg) const +{ + bool origin_disk_exist = false; + bool target_disk_exist = false; + zauto_read_lock l(_lock); + for (const auto &dn : _dir_nodes) { + // TODO(yingchun): skip non-available dir_node. + if (dn->tag == origin_disk) { + origin_disk_exist = true; + if (!dn->has(pid)) { + err_msg = fmt::format( + "replica({}) doesn't exist on the origin disk({})", pid, origin_disk); + return disk_migrate_validation::kReplicaNotFound; + } + } + + if (dn->tag == target_disk) { + target_disk_exist = true; + if (dn->has(pid)) { + err_msg = + fmt::format("replica({}) already exists on target disk({})", pid, target_disk); + return disk_migrate_validation::kReplicaExist; + } + } + } + + if (!origin_disk_exist) { + err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk); + return disk_migrate_validation::kDiskNotFound; + } + + if (!target_disk_exist) { + err_msg = fmt::format("target disk({}) doesn't exist", target_disk); + return disk_migrate_validation::kDiskNotFound; + } + + return disk_migrate_validation::kValid; +} + } // namespace replication } // namespace dsn diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h index ebdb3640ab..39e8fc1f02 100644 --- a/src/common/fs_manager.h +++ b/src/common/fs_manager.h @@ -48,7 +48,7 @@ struct dir_node int64_t disk_capacity_mb; int64_t disk_available_mb; int disk_available_ratio; - disk_status::type status; + std::atomic status; std::map> holding_replicas; std::map> holding_primary_replicas; std::map> holding_secondary_replicas; @@ -72,9 +72,10 @@ struct dir_node // and protected by the lock in fs_manager. uint64_t replicas_count(app_id id) const; uint64_t replicas_count() const; + std::string replica_dir(const char *app_type, const dsn::gpid &pid) const; bool has(const dsn::gpid &pid) const; uint64_t remove(const dsn::gpid &pid); - bool update_disk_stat(const bool update_disk_status); + void update_disk_stat(); }; class fs_manager @@ -86,23 +87,35 @@ class fs_manager // NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order. void initialize(const std::vector &data_dirs, const std::vector &data_dir_tags); - - dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag); - void allocate_dir(const dsn::gpid &pid, - const std::string &type, - /*out*/ std::string &dir); + dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const; + void specify_dir_for_new_replica_for_test(dir_node *specified_dn, const dsn::gpid &pid) const; void add_replica(const dsn::gpid &pid, const std::string &pid_dir); + dir_node *get_replica_dir(const char *app_type, gpid id, bool create_new); + // during partition split, we should gurantee child replica and parent replica share the + // same data dir + dir_node *get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir); void remove_replica(const dsn::gpid &pid); - bool for_each_dir_node(const std::function &func) const; - void update_disk_stat(bool check_status_changed = true); + void update_disk_stat(); void add_new_dir_node(const std::string &data_dir, const std::string &tag); - bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const; - const std::vector &get_available_data_dirs() const + bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const; + const std::vector> &get_dir_nodes() const { zauto_read_lock l(_lock); - return _available_data_dirs; + return _dir_nodes; } + enum class disk_migrate_validation + { + kValid, + kReplicaNotFound, + kReplicaExist, + kDiskNotFound + }; + disk_migrate_validation validate_migrate_task(gpid pid, + const std::string &origin_disk, + const std::string &target_disk, + std::string &err_msg) const; + std::vector get_disk_infos(int app_id) const; private: void reset_disk_stat() @@ -112,28 +125,23 @@ class fs_manager _total_available_ratio = 0; _min_available_ratio = 100; _max_available_ratio = 0; - _status_updated_dir_nodes.clear(); } dir_node *get_dir_node(const std::string &subdir) const; // when visit the tag/storage of the _dir_nodes map, there's no need to protect by the lock. // but when visit the holding_replicas, you must take care. - mutable zrwlock_nr _lock; - + mutable zrwlock_nr _lock; // [ lock int64_t _total_capacity_mb = 0; int64_t _total_available_mb = 0; int _total_available_ratio = 0; int _min_available_ratio = 100; int _max_available_ratio = 0; + // Once dir_node has been added to '_dir_nodes', it will not be removed, it will be marked + // as non-NORMAL status if it is not available. std::vector> _dir_nodes; - std::vector _available_data_dirs; - - // Used for disk available space check - // disk status will be updated periodically, this vector record nodes whose disk_status changed - // in this round - std::vector> _status_updated_dir_nodes; + // ] end of lock perf_counter_wrapper _counter_total_capacity_mb; perf_counter_wrapper _counter_total_available_mb; diff --git a/src/common/replication_enums.h b/src/common/replication_enums.h index 8f07a70b9d..5eef1710e6 100644 --- a/src/common/replication_enums.h +++ b/src/common/replication_enums.h @@ -152,6 +152,7 @@ ENUM_END2(replication::disk_migration_status::type, disk_migration_status) ENUM_BEGIN2(replication::disk_status::type, disk_status, replication::disk_status::NORMAL) ENUM_REG(replication::disk_status::NORMAL) ENUM_REG(replication::disk_status::SPACE_INSUFFICIENT) +ENUM_REG(replication::disk_status::IO_ERROR) ENUM_END2(replication::disk_status::type, disk_status) ENUM_BEGIN2(replication::manual_compaction_status::type, diff --git a/src/common/test/fs_manager_test.cpp b/src/common/test/fs_manager_test.cpp index d6bbcfa313..9e5d27ef17 100644 --- a/src/common/test/fs_manager_test.cpp +++ b/src/common/test/fs_manager_test.cpp @@ -31,30 +31,24 @@ namespace replication { TEST(fs_manager, dir_update_disk_status) { - std::shared_ptr node = std::make_shared("tag", "path"); struct update_disk_status { - bool update_status; bool mock_insufficient; disk_status::type old_disk_status; disk_status::type new_disk_status; - bool expected_ret; - } tests[] = { - {false, false, disk_status::NORMAL, disk_status::NORMAL, false}, - {false, true, disk_status::NORMAL, disk_status::NORMAL, false}, - {true, false, disk_status::NORMAL, disk_status::NORMAL, false}, - {true, false, disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL, true}, - {true, true, disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT, true}, - {true, true, disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT, false}}; + } tests[] = {{false, disk_status::NORMAL, disk_status::NORMAL}, + {false, disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}, + {true, disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT}, + {true, disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT}}; for (const auto &test : tests) { - node->status = test.old_disk_status; + auto node = std::make_shared("tag", "path", 0, 0, 0, test.old_disk_status); fail::setup(); if (test.mock_insufficient) { fail::cfg("filesystem_get_disk_space_info", "return(insufficient)"); } else { fail::cfg("filesystem_get_disk_space_info", "return(normal)"); } - ASSERT_EQ(test.expected_ret, node->update_disk_stat(test.update_status)); + node->update_disk_stat(); ASSERT_EQ(test.new_disk_status, node->status); fail::teardown(); } diff --git a/src/meta/test/balancer_validator.cpp b/src/meta/test/balancer_validator.cpp index d0e681ed84..8e1d456543 100644 --- a/src/meta/test/balancer_validator.cpp +++ b/src/meta/test/balancer_validator.cpp @@ -113,106 +113,6 @@ static void check_cure(app_mapper &apps, node_mapper &nodes, ::dsn::partition_co ns->put_partition(pc.pid, true); } -// static void verbose_nodes(const node_mapper& nodes) -//{ -// std::cout << "------------" << std::endl; -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("node: %s\ntotal_primaries: %d, total_secondaries: %d\n", n.first.to_string(), -// ns.primary_count(), ns.partition_count()); -// for (int i=1; i<=2; ++i) -// { -// printf("app %d primaries: %d, app %d partitions: %d\n", i, ns.primary_count(i), i, -// ns.partition_count(i)); -// } -// } -//} -// -// static void verbose_app_node(const node_mapper& nodes) -//{ -// printf("Total_Pri: "); -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("%*d", 3, ns.primary_count()); -// } -// printf("\nTotal_Sec: "); -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("%*d", 3, ns.secondary_count()); -// } -// printf("\nApp01_Pri: "); -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("%*d", 3, ns.primary_count(1)); -// } -// printf("\nApp01_Sec: "); -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("%*d", 3, ns.secondary_count(1)); -// } -// printf("\nApp02_Pri: "); -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("%*d", 3, ns.primary_count(2)); -// } -// printf("\nApp02_Sec: "); -// for (const auto& n: nodes) -// { -// const node_state& ns = n.second; -// printf("%*d", 3, ns.secondary_count(2)); -// } -// printf("\n"); -//} - -// static void verbose_app(const std::shared_ptr& app) -//{ -// std::cout << app->app_name << " " << app->app_id << " " << app->partition_count << std::endl; -// for (int i=0; ipartition_count; ++i) -// { -// const partition_configuration& pc = app->partitions[i]; -// std::cout << pc.primary.to_string(); -// for (int j=0; jsecond; -// m.for_each_dir_node([apps_count](const dir_node &dn) { -// printf("%8s", dn.tag.c_str()); -// for (int i = 1; i <= apps_count; ++i) { -// printf("%8u", dn.replicas_count(i)); -// } -// printf("%8u\n", dn.replicas_count()); -// return true; -// }); -// } -//} - void meta_service_test_app::balancer_validator() { std::vector node_list; diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp index aacf8c043c..0293412c69 100644 --- a/src/meta/test/misc/misc.cpp +++ b/src/meta/test/misc/misc.cpp @@ -240,19 +240,21 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo std::string dir; replica_info ri; switch (act.type) { - case config_type::CT_ASSIGN_PRIMARY: - target_manager->allocate_dir(pid, "test", dir); - CHECK_EQ(dsn::ERR_OK, target_manager->get_disk_tag(dir, ri.disk_tag)); + case config_type::CT_ASSIGN_PRIMARY: { + auto selected = target_manager->find_best_dir_for_new_replica(pid); + CHECK_NOTNULL(selected, ""); + selected->holding_replicas[pid.get_app_id()].emplace(pid); cc->collect_serving_replica(act.target, ri); break; - + } case config_type::CT_ADD_SECONDARY: - case config_type::CT_ADD_SECONDARY_FOR_LB: - node_manager->allocate_dir(pid, "test", dir); - CHECK_EQ(dsn::ERR_OK, node_manager->get_disk_tag(dir, ri.disk_tag)); + case config_type::CT_ADD_SECONDARY_FOR_LB: { + auto selected = node_manager->find_best_dir_for_new_replica(pid); + CHECK_NOTNULL(selected, ""); + selected->holding_replicas[pid.get_app_id()].emplace(pid); cc->collect_serving_replica(act.node, ri); break; - + } case config_type::CT_DOWNGRADE_TO_SECONDARY: case config_type::CT_UPGRADE_TO_PRIMARY: break; diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp index a1128f3003..6ae7b0d0b2 100644 --- a/src/replica/disk_cleaner.cpp +++ b/src/replica/disk_cleaner.cpp @@ -67,14 +67,19 @@ const std::string kFolderSuffixBak = ".bak"; const std::string kFolderSuffixOri = ".ori"; const std::string kFolderSuffixTmp = ".tmp"; -error_s disk_remove_useless_dirs(const std::vector &data_dirs, +error_s disk_remove_useless_dirs(const std::vector> &dir_nodes, /*output*/ disk_cleaning_report &report) { std::vector sub_list; - for (auto &dir : data_dirs) { + for (const auto &dn : dir_nodes) { + // It's allowed to clear up the directory when it's SPACE_INSUFFICIENT, but not allowed when + // it's IO_ERROR. + if (dn->status == disk_status::IO_ERROR) { + continue; + } std::vector tmp_list; - if (!dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false)) { - LOG_WARNING("gc_disk: failed to get subdirectories in {}", dir); + if (!dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false)) { + LOG_WARNING("gc_disk: failed to get subdirectories in {}", dn->full_dir); return error_s::make(ERR_OBJECT_NOT_FOUND, "failed to get subdirectories"); } sub_list.insert(sub_list.end(), tmp_list.begin(), tmp_list.end()); diff --git a/src/replica/disk_cleaner.h b/src/replica/disk_cleaner.h index 91fdb68051..e1c6566c7a 100644 --- a/src/replica/disk_cleaner.h +++ b/src/replica/disk_cleaner.h @@ -21,6 +21,7 @@ #include #include +#include "common/fs_manager.h" #include "utils/errors.h" #include "utils/flags.h" @@ -49,7 +50,7 @@ struct disk_cleaning_report }; // Removes the useless data from data directories. -extern error_s disk_remove_useless_dirs(const std::vector &data_dirs, +extern error_s disk_remove_useless_dirs(const std::vector> &dir_nodes, /*output*/ disk_cleaning_report &report); inline bool is_data_dir_removable(const std::string &dir) diff --git a/src/replica/duplication/replica_follower.cpp b/src/replica/duplication/replica_follower.cpp index b38503d3b7..9f23811490 100644 --- a/src/replica/duplication/replica_follower.cpp +++ b/src/replica/duplication/replica_follower.cpp @@ -236,7 +236,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node, request->source_disk_tag = remote_disk; request->source_dir = remote_dir; request->files = file_list; - request->dest_disk_tag = _replica->get_replica_disk_tag(); + request->dest_disk_tag = _replica->get_dir_node()->tag; request->dest_dir = dest_dir; request->overwrite = true; request->high_priority = false; diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp index c3edde95f8..36c2b02a92 100644 --- a/src/replica/duplication/test/load_from_private_log_test.cpp +++ b/src/replica/duplication/test/load_from_private_log_test.cpp @@ -247,7 +247,7 @@ class load_from_private_log_test : public duplication_test_base mutation_log_ptr create_private_log(gpid id) { return create_private_log(1, id); } - mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 1)) + mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 0)) { std::map replay_condition; replay_condition[id] = 0; // duplicating @@ -319,15 +319,21 @@ TEST_F(load_from_private_log_test, handle_real_private_log) }; for (auto tt : tests) { - boost::filesystem::path file(tt.fname); - boost::filesystem::copy_file( - file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists); - // reset replica to specified gpid duplicator.reset(nullptr); - _replica = create_mock_replica( - stub.get(), tt.id.get_app_id(), tt.id.get_partition_index(), _log_dir.c_str()); + _replica = create_mock_replica(stub.get(), tt.id.get_app_id(), tt.id.get_partition_index()); + + // Update '_log_dir' to the corresponding replica created above. + _log_dir = _replica->dir(); + + // Copy the log file to '_log_dir' + boost::filesystem::path file(tt.fname); + boost::system::error_code ec; + boost::filesystem::copy_file( + file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec); + ASSERT_TRUE(!ec); + // Start to verify. load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0); } } diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index bcaa762df8..c7dde0c3ae 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -100,7 +100,7 @@ const std::string replica::kAppInfo = ".app-info"; replica::replica(replica_stub *stub, gpid gpid, const app_info &app, - const char *dir, + dir_node *dn, bool need_restore, bool is_duplication_follower) : serverlet("replica"), @@ -124,7 +124,10 @@ replica::replica(replica_stub *stub, CHECK(!_app_info.app_type.empty(), ""); CHECK_NOTNULL(stub, ""); _stub = stub; - _dir = dir; + CHECK_NOTNULL(dn, ""); + _dir_node = dn; + _dir = dn->replica_dir(_app_info.app_type.c_str(), gpid); + CHECK(dsn::utils::filesystem::directory_exists(_dir), "dir({}) not exist", _dir); _options = &stub->options(); init_state(); _config.pid = gpid; @@ -232,7 +235,6 @@ void replica::init_state() _last_config_change_time_ms = _create_time_ms; update_last_checkpoint_generate_time(); _private_log = nullptr; - init_disk_tag(); get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, _allow_ingest_behind); } @@ -303,11 +305,14 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling) auto storage_error = _app->on_request(request); if (dsn_unlikely(storage_error != ERR_OK)) { switch (storage_error) { - // TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage - // engine errors. + // TODO(yingchun): Now only kCorruption and kIOError is dealt, consider to deal with + // more storage engine errors. case rocksdb::Status::kCorruption: handle_local_failure(ERR_RDB_CORRUPTION); break; + case rocksdb::Status::kIOError: + handle_local_failure(ERR_RDB_IO_ERROR); + break; default: LOG_ERROR_PREFIX("client read encountered an unhandled error: {}", storage_error); } @@ -590,14 +595,6 @@ uint32_t replica::query_data_version() const return _app->query_data_version(); } -void replica::init_disk_tag() -{ - dsn::error_code err = _stub->_fs_manager.get_disk_tag(dir(), _disk_tag); - if (dsn::ERR_OK != err) { - LOG_ERROR_PREFIX("get disk tag of {} failed: {}, init it to empty ", dir(), err); - } -} - error_code replica::store_app_info(app_info &info, const std::string &path) { replica_app_info new_info((app_info *)&info); diff --git a/src/replica/replica.h b/src/replica/replica.h index 2e81083715..6d8da76f3f 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -52,6 +52,7 @@ #include #include +#include "common/fs_manager.h" #include "common/replication_other_types.h" #include "dsn.layer2_types.h" #include "meta_admin_types.h" @@ -76,6 +77,13 @@ #include "utils/throttling_controller.h" #include "utils/uniq_timestamp_us.h" +namespace pegasus { +namespace server { +class pegasus_server_test_base; +class rocksdb_wrapper_test; +} // namespace server +} // namespace pegasus + namespace dsn { class gpid; class perf_counter; @@ -284,11 +292,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // routine for get extra envs from replica const std::map &get_replica_extra_envs() const { return _extra_envs; } - - void set_disk_status(disk_status::type status) { _disk_status = status; } - bool disk_space_insufficient() { return _disk_status == disk_status::SPACE_INSUFFICIENT; } - disk_status::type get_disk_status() { return _disk_status; } - std::string get_replica_disk_tag() const { return _disk_tag; } + const dir_node *get_dir_node() const { return _dir_node; } static const std::string kAppInfo; @@ -308,7 +312,7 @@ class replica : public serverlet, public ref_counter, public replica_ba replica(replica_stub *stub, gpid gpid, const app_info &app, - const char *dir, + dir_node *dn, bool need_restore, bool is_duplication_follower = false); error_code initialize_on_new(); @@ -516,8 +520,6 @@ class replica : public serverlet, public ref_counter, public replica_ba // update envs to deny client request void update_deny_client(const std::map &envs); - void init_disk_tag(); - // store `info` into a file under `path` directory // path = "" means using the default directory (`_dir`/.app_info) error_code store_app_info(app_info &info, const std::string &path = ""); @@ -550,6 +552,8 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_disk_migrate_test; friend class open_replica_test; friend class replica_follower; + friend class ::pegasus::server::pegasus_server_test_base; + friend class ::pegasus::server::rocksdb_wrapper_test; FRIEND_TEST(replica_test, test_auto_trash); // replica configuration, updated by update_local_configuration ONLY @@ -573,7 +577,6 @@ class replica : public serverlet, public ref_counter, public replica_ba // constants replica_stub *_stub; std::string _dir; - std::string _disk_tag; replication_options *_options; app_info _app_info; std::map _extra_envs; @@ -669,7 +672,8 @@ class replica : public serverlet, public ref_counter, public replica_ba std::unique_ptr _access_controller; - disk_status::type _disk_status{disk_status::NORMAL}; + // The dir_node where the replica data is placed. + dir_node *_dir_node{nullptr}; bool _allow_ingest_behind{false}; // Indicate where the storage engine data is corrupted and unrecoverable. diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 0fd3b9851d..ce7a7484b7 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -178,7 +178,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) } if (FLAGS_reject_write_when_disk_insufficient && - (disk_space_insufficient() || _primary_states.secondary_disk_space_insufficient())) { + (_dir_node->status != disk_status::NORMAL || + _primary_states.secondary_disk_space_insufficient())) { response_client_write(request, ERR_DISK_INSUFFICIENT); return; } diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp index 8cd0325e2b..cc3f0427ce 100644 --- a/src/replica/replica_check.cpp +++ b/src/replica/replica_check.cpp @@ -210,7 +210,7 @@ void replica::on_group_check(const group_check_request &request, } // the group check may trigger start/finish/cancel/pause a split on the secondary. _split_mgr->trigger_secondary_parent_split(request, response); - response.__set_disk_status(_disk_status); + response.__set_disk_status(_dir_node->status); break; case partition_status::PS_POTENTIAL_SECONDARY: init_learn(request.config.learner_signature); diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp index 6bb5dd48d2..34c97e14cb 100644 --- a/src/replica/replica_context.cpp +++ b/src/replica/replica_context.cpp @@ -183,10 +183,11 @@ void primary_context::cleanup_split_states() bool primary_context::secondary_disk_space_insufficient() const { for (const auto &kv : secondary_disk_status) { - if (kv.second == disk_status::SPACE_INSUFFICIENT) { - LOG_INFO("partition[{}] secondary[{}] disk space is insufficient", + if (kv.second != disk_status::NORMAL) { + LOG_INFO("partition[{}] secondary[{}] disk space is {}", membership.pid, - kv.first.to_string()); + kv.first.to_string(), + enum_to_string(kv.second)); return true; } } diff --git a/src/replica/replica_disk_migrator.cpp b/src/replica/replica_disk_migrator.cpp index 0054f2734a..36dd95a6df 100644 --- a/src/replica/replica_disk_migrator.cpp +++ b/src/replica/replica_disk_migrator.cpp @@ -126,61 +126,28 @@ bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc) return false; } - bool valid_origin_disk = false; - bool valid_target_disk = false; - // _dir_nodes: std::vector> - for (const auto &dir_node : _replica->get_replica_stub()->_fs_manager._dir_nodes) { - if (dir_node->tag == req.origin_disk) { - valid_origin_disk = true; - if (!dir_node->has(req.pid)) { - std::string err_msg = - fmt::format("Invalid replica(replica({}) doesn't exist on origin disk({}))", - req.pid, - req.origin_disk); - LOG_ERROR_PREFIX( - "received replica disk migrate request(origin={}, target={}), err = {}", - req.origin_disk, - req.target_disk, - err_msg); - resp.err = ERR_OBJECT_NOT_FOUND; - resp.__set_hint(err_msg); - return false; - } - } - - if (dir_node->tag == req.target_disk) { - valid_target_disk = true; - if (dir_node->has(get_gpid())) { - std::string err_msg = - fmt::format("Invalid replica(replica({}) has existed on target disk({}))", - req.pid, - req.target_disk); - LOG_ERROR_PREFIX( - "received replica disk migrate request(origin={}, target={}), err = {}", - req.origin_disk, - req.target_disk, - err_msg); - resp.err = ERR_PATH_ALREADY_EXIST; - resp.__set_hint(err_msg); - return false; - } - } + std::string err_msg; + fs_manager::disk_migrate_validation type = + _replica->get_replica_stub()->_fs_manager.validate_migrate_task( + req.pid, req.origin_disk, req.target_disk, err_msg); + if (type != fs_manager::disk_migrate_validation::kValid) { + LOG_ERROR_PREFIX(err_msg); + resp.__set_hint(err_msg); } - - if (!valid_origin_disk || !valid_target_disk) { - std::string invalid_disk_tag = !valid_origin_disk ? req.origin_disk : req.target_disk; - std::string err_msg = fmt::format("Invalid disk tag({} doesn't exist)", invalid_disk_tag); - LOG_ERROR_PREFIX("received replica disk migrate request(origin={}, target={}), err = {}", - req.origin_disk, - req.target_disk, - err_msg); + switch (type) { + case fs_manager::disk_migrate_validation::kReplicaNotFound: resp.err = ERR_OBJECT_NOT_FOUND; - resp.__set_hint(err_msg); return false; + case fs_manager::disk_migrate_validation::kReplicaExist: + resp.err = ERR_PATH_ALREADY_EXIST; + return false; + case fs_manager::disk_migrate_validation::kDiskNotFound: + resp.err = ERR_OBJECT_NOT_FOUND; + return false; + default: + resp.err = ERR_OK; + return true; } - - resp.err = ERR_OK; - return true; } // THREAD_POOL_REPLICATION_LONG diff --git a/src/replica/replica_failover.cpp b/src/replica/replica_failover.cpp index 94edaab837..926562d140 100644 --- a/src/replica/replica_failover.cpp +++ b/src/replica/replica_failover.cpp @@ -54,7 +54,9 @@ void replica::handle_local_failure(error_code error) { LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, enum_to_string(status())); - if (error == ERR_RDB_CORRUPTION) { + if (error == ERR_RDB_IO_ERROR) { + _dir_node->status = disk_status::IO_ERROR; + } else if (error == ERR_RDB_CORRUPTION) { _data_corrupted = true; } diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp index a287156478..01bb1a95c0 100644 --- a/src/replica/replica_init.cpp +++ b/src/replica/replica_init.cpp @@ -68,16 +68,9 @@ DSN_DEFINE_int32(replication, error_code replica::initialize_on_new() { - // if (dsn::utils::filesystem::directory_exists(_dir) && - // !dsn::utils::filesystem::remove_path(_dir)) - //{ - // LOG_ERROR("cannot allocate new replica @ {}, as the dir is already exists", _dir); - // return ERR_PATH_ALREADY_EXIST; - //} - // // TODO: check if _dir contain other file or directory except for - // "restore.policy_name.backup_id" - // which is applied to restore from cold backup + // "restore.policy_name.backup_id" which is applied to restore from + // cold backup. if (!dsn::utils::filesystem::directory_exists(_dir) && !dsn::utils::filesystem::create_directory(_dir)) { LOG_ERROR("cannot allocate new replica @ {}, because create dir failed", _dir); diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index 589e3e28dc..53855a6b90 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -543,7 +543,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) err); } else { response.base_local_dir = _app->data_dir(); - response.__set_replica_disk_tag(get_replica_disk_tag()); + response.__set_replica_disk_tag(_dir_node->tag); LOG_INFO_PREFIX( "on_learn[{:#018x}]: learner = {}, get app learn state succeed, " "learned_meta_size = {}, learned_file_count = {}, learned_to_decree = {}", @@ -910,7 +910,7 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response resp.replica_disk_tag, resp.base_local_dir, resp.state.files, - get_replica_disk_tag(), + _dir_node->tag, learn_dir, get_gpid(), true, // overwrite @@ -1237,8 +1237,12 @@ void replica::handle_learning_error(error_code err, bool is_local_error) err, is_local_error ? "local_error" : "remote error"); - if (is_local_error && err == ERR_RDB_CORRUPTION) { - _data_corrupted = true; + if (is_local_error) { + if (err == ERR_RDB_IO_ERROR) { + _dir_node->status = disk_status::IO_ERROR; + } else if (err == ERR_RDB_CORRUPTION) { + _data_corrupted = true; + } } _stub->_counter_replicas_learning_recent_learn_fail_count->increment(); diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 77a0d58f68..385a10f10f 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -100,11 +100,6 @@ namespace dsn { namespace replication { - -DSN_DEFINE_bool(replication, - ignore_broken_disk, - true, - "true means ignore broken data disk when initialize"); DSN_DEFINE_bool(replication, deny_client_on_start, false, @@ -603,77 +598,86 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f } } - // init dirs + // Initialize the file system manager. + _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags); + + // TODO(yingchun): remove the slog related code. + // Create slog directory if it does not exist. std::string cdir; std::string err_msg; - CHECK( - dsn::utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), "{}", err_msg); + CHECK(utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), err_msg); _options.slog_dir = cdir; - initialize_fs_manager(_options.data_dirs, _options.data_dir_tags); + // Initialize slog. _log = new mutation_log_shared(_options.slog_dir, FLAGS_log_shared_file_size_mb, FLAGS_log_shared_force_flush, &_counter_shared_log_recent_write_size); LOG_INFO("slog_dir = {}", _options.slog_dir); - // init rps + // Start to load replicas in available data directories. LOG_INFO("start to load replicas"); - - std::vector dir_list; - for (auto &dir : _fs_manager.get_available_data_dirs()) { - std::vector tmp_list; - CHECK(dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false), - "Fail to get subdirectories in {}.", - dir); - dir_list.insert(dir_list.end(), tmp_list.begin(), tmp_list.end()); + std::map> dirs_by_dn; + for (const auto &dn : _fs_manager.get_dir_nodes()) { + // Skip IO error dir_node. + if (dn->status == disk_status::IO_ERROR) { + continue; + } + std::vector sub_directories; + CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false), + "fail to get sub_directories in {}", + dn->full_dir); + dirs_by_dn.emplace(dn.get(), sub_directories); } replicas rps; utils::ex_lock rps_lock; std::deque load_tasks; uint64_t start_time = dsn_now_ms(); - for (auto &dir : dir_list) { - if (dsn::replication::is_data_dir_invalid(dir)) { - LOG_INFO("ignore dir {}", dir); - continue; - } + for (const auto &dn_dirs : dirs_by_dn) { + const auto dn = dn_dirs.first; + for (const auto &dir : dn_dirs.second) { + if (dsn::replication::is_data_dir_invalid(dir)) { + LOG_INFO("ignore dir {}", dir); + continue; + } - load_tasks.push_back( - tasking::create_task(LPC_REPLICATION_INIT_LOAD, - &_tracker, - [this, dir, &rps, &rps_lock] { - LOG_INFO("process dir {}", dir); - - auto r = load_replica(dir.c_str()); - if (r != nullptr) { - LOG_INFO("{}@{}: load replica '{}' success, = <{}, {}>, last_prepared_decree = {}", - r->get_gpid(), - dsn_primary_address(), - dir, - r->last_durable_decree(), - r->last_committed_decree(), - r->last_prepared_decree()); - - utils::auto_lock l(rps_lock); - CHECK(rps.find(r->get_gpid()) == rps.end(), - "conflict replica dir: {} <--> {}", - r->dir(), - rps[r->get_gpid()]->dir()); - - rps[r->get_gpid()] = r; - } - }, - load_tasks.size())); - load_tasks.back()->enqueue(); + load_tasks.push_back(tasking::create_task( + LPC_REPLICATION_INIT_LOAD, + &_tracker, + [this, dn, dir, &rps, &rps_lock] { + LOG_INFO("process dir {}", dir); + + auto r = load_replica(dn, dir.c_str()); + if (r != nullptr) { + LOG_INFO("{}@{}: load replica '{}' success, = <{}, {}>, last_prepared_decree = {}", + r->get_gpid(), + dsn_primary_address(), + dir, + r->last_durable_decree(), + r->last_committed_decree(), + r->last_prepared_decree()); + + utils::auto_lock l(rps_lock); + CHECK(rps.find(r->get_gpid()) == rps.end(), + "conflict replica dir: {} <--> {}", + r->dir(), + rps[r->get_gpid()]->dir()); + + rps[r->get_gpid()] = r; + } + }, + load_tasks.size())); + load_tasks.back()->enqueue(); + } } for (auto &tsk : load_tasks) { tsk->wait(); } uint64_t finish_time = dsn_now_ms(); - dir_list.clear(); + dirs_by_dn.clear(); load_tasks.clear(); LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms", rps.size(), @@ -830,36 +834,6 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f } } -void replica_stub::initialize_fs_manager(const std::vector &data_dirs, - const std::vector &data_dir_tags) -{ - std::string cdir; - std::string err_msg; - int count = 0; - std::vector available_dirs; - std::vector available_dir_tags; - for (auto i = 0; i < data_dir_tags.size(); ++i) { - const auto &dir = data_dirs[i]; - if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) || - !utils::filesystem::check_dir_rw(dir, err_msg))) { - if (FLAGS_ignore_broken_disk) { - LOG_WARNING("data dir[{}] is broken, ignore it, error:{}", dir, err_msg); - } else { - CHECK(false, "{}", err_msg); - } - continue; - } - LOG_INFO("data_dirs[{}] = {}", count, cdir); - available_dirs.emplace_back(cdir); - available_dir_tags.emplace_back(data_dir_tags[i]); - count++; - } - - CHECK_GT_MSG( - available_dirs.size(), 0, "initialize fs manager failed, no available data directory"); - _fs_manager.initialize(available_dirs, available_dir_tags); -} - void replica_stub::initialize_start() { if (_is_running) { @@ -1124,40 +1098,17 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc) int app_id = 0; if (!req.app_name.empty()) { zauto_read_lock l(_replicas_lock); - if (!(app_id = get_app_id_from_replicas(req.app_name))) { + app_id = get_app_id_from_replicas(req.app_name); + if (app_id == 0) { resp.err = ERR_OBJECT_NOT_FOUND; return; } } - for (const auto &dir_node : _fs_manager._dir_nodes) { - disk_info info; - // app_name empty means query all app replica_count - if (req.app_name.empty()) { - info.holding_primary_replicas = dir_node->holding_primary_replicas; - info.holding_secondary_replicas = dir_node->holding_secondary_replicas; - } else { - const auto &primary_iter = dir_node->holding_primary_replicas.find(app_id); - if (primary_iter != dir_node->holding_primary_replicas.end()) { - info.holding_primary_replicas[app_id] = primary_iter->second; - } - - const auto &secondary_iter = dir_node->holding_secondary_replicas.find(app_id); - if (secondary_iter != dir_node->holding_secondary_replicas.end()) { - info.holding_secondary_replicas[app_id] = secondary_iter->second; - } - } - info.tag = dir_node->tag; - info.full_dir = dir_node->full_dir; - info.disk_capacity_mb = dir_node->disk_capacity_mb; - info.disk_available_mb = dir_node->disk_available_mb; - - resp.disk_infos.emplace_back(info); - } - - resp.total_capacity_mb = _fs_manager._total_capacity_mb; - resp.total_available_mb = _fs_manager._total_available_mb; - + resp.disk_infos = _fs_manager.get_disk_infos(app_id); + // Get the statistics from fs_manager's metrics, they are thread-safe. + resp.total_capacity_mb = _fs_manager._counter_total_capacity_mb->get_integer_value(); + resp.total_available_mb = _fs_manager._counter_total_available_mb->get_integer_value(); resp.err = ERR_OK; } @@ -1218,7 +1169,7 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc) std::vector data_dirs; std::vector data_dir_tags; - std::string err_msg = ""; + std::string err_msg; if (disk_str.empty() || !replication_options::get_data_dir_and_tag( disk_str, "", "replica", data_dirs, data_dir_tags, err_msg)) { @@ -1229,10 +1180,10 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc) for (auto i = 0; i < data_dir_tags.size(); ++i) { auto dir = data_dirs[i]; - if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) { + if (_fs_manager.is_dir_node_exist(dir, data_dir_tags[i])) { resp.err = ERR_NODE_ALREADY_EXIST; resp.__set_err_hint( - fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i])); + fmt::format("data_dir({}) tag({}) already exist", dir, data_dir_tags[i])); return; } @@ -1252,6 +1203,8 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc) } LOG_INFO("Add a new disk in fs_manager, data_dir={}, tag={}", cdir, data_dir_tags[i]); + // TODO(yingchun): there is a gap between _fs_manager.is_dir_node_exist() and + // _fs_manager.add_new_dir_node() which is not atomic. _fs_manager.add_new_dir_node(cdir, data_dir_tags[i]); } } @@ -1405,12 +1358,7 @@ void replica_stub::get_replica_info(replica_info &info, replica_ptr r) info.last_committed_decree = r->last_committed_decree(); info.last_prepared_decree = r->last_prepared_decree(); info.last_durable_decree = r->last_durable_decree(); - - dsn::error_code err = _fs_manager.get_disk_tag(r->dir(), info.disk_tag); - if (dsn::ERR_OK != err) { - LOG_WARNING("get disk tag of {} failed: {}", r->dir(), err); - } - + info.disk_tag = r->get_dir_node()->tag; info.__set_manual_compact_status(r->get_manual_compact_status()); } @@ -1768,7 +1716,6 @@ void replica_stub::init_gc_for_test() void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id) { - std::string replica_path; std::pair closed_info; { @@ -1778,26 +1725,30 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id) return; closed_info = iter->second; _closed_replicas.erase(iter); - _fs_manager.remove_replica(id); } + _fs_manager.remove_replica(id); - replica_path = get_replica_dir(closed_info.first.app_type.c_str(), id, false); - if (replica_path.empty()) { + const auto dn = _fs_manager.get_replica_dir(closed_info.first.app_type.c_str(), id, false); + if (dn == nullptr) { LOG_WARNING( "gc closed replica({}.{}) failed, no exist data", id, closed_info.first.app_type); return; } + const auto replica_path = dn->replica_dir(closed_info.first.app_type.c_str(), id); + CHECK( + dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path); LOG_INFO("start to move replica({}) as garbage, path: {}", id, replica_path); - char rename_path[1024]; - sprintf(rename_path, "%s.%" PRIu64 ".gar", replica_path.c_str(), dsn_now_us()); + const auto rename_path = fmt::format("{}.{}.gar", replica_path, dsn_now_us()); if (!dsn::utils::filesystem::rename_path(replica_path, rename_path)) { LOG_WARNING("gc_replica: failed to move directory '{}' to '{}'", replica_path, rename_path); // if gc the replica failed, add it back - zauto_write_lock l(_replicas_lock); + { + zauto_write_lock l(_replicas_lock); + _closed_replicas.emplace(id, closed_info); + } _fs_manager.add_replica(id, replica_path); - _closed_replicas.emplace(id, closed_info); } else { LOG_WARNING("gc_replica: replica_dir_op succeed to move directory '{}' to '{}'", replica_path, @@ -2014,10 +1965,9 @@ void replica_stub::on_disk_stat() uint64_t start = dsn_now_ns(); disk_cleaning_report report{}; - dsn::replication::disk_remove_useless_dirs(_fs_manager.get_available_data_dirs(), report); + dsn::replication::disk_remove_useless_dirs(_fs_manager.get_dir_nodes(), report); _fs_manager.update_disk_stat(); update_disk_holding_replicas(); - update_disks_status(); _counter_replicas_error_replica_dir_count->set(report.error_replica_count); _counter_replicas_garbage_replica_dir_count->set(report.garbage_replica_count); @@ -2097,9 +2047,12 @@ void replica_stub::open_replica( const std::shared_ptr &group_check, const std::shared_ptr &configuration_update) { - std::string dir = get_replica_dir(app.app_type.c_str(), id, false); - replica_ptr rep = nullptr; - if (!dir.empty()) { + replica_ptr rep; + std::string dir; + auto dn = _fs_manager.get_replica_dir(app.app_type.c_str(), id, false); + if (dn != nullptr) { + dir = dn->replica_dir(app.app_type.c_str(), id); + CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir); // NOTICE: if partition is DDD, and meta select one replica as primary, it will execute the // load-process because of a.b.pegasus is exist, so it will never execute the restore // process below @@ -2108,18 +2061,20 @@ void replica_stub::open_replica( _primary_address_str, group_check ? "with" : "without", dir); - rep = load_replica(dir.c_str()); + rep = load_replica(dn, dir.c_str()); // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk // migration if (rep == nullptr) { - std::string origin_tmp_dir = get_replica_dir( - fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix) - .c_str(), - id, - false); - if (!origin_tmp_dir.empty()) { - LOG_INFO("mark the dir {} is garbage, start revert and load disk migration origin " + const auto origin_dir_type = + fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix); + const auto origin_dn = _fs_manager.get_replica_dir(origin_dir_type.c_str(), id, false); + if (origin_dn != nullptr) { + const auto origin_tmp_dir = origin_dn->replica_dir(origin_dir_type.c_str(), id); + CHECK(dsn::utils::filesystem::directory_exists(origin_tmp_dir), + "dir({}) not exist", + origin_tmp_dir); + LOG_INFO("mark the dir {} as garbage, start revert and load disk migration origin " "replica data({})", dir, origin_tmp_dir); @@ -2131,7 +2086,7 @@ void replica_stub::open_replica( boost::replace_first( origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, ""); dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir); - rep = load_replica(origin_dir.c_str()); + rep = load_replica(origin_dn, origin_dir.c_str()); FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {}); } @@ -2232,18 +2187,22 @@ replica *replica_stub::new_replica(gpid gpid, bool is_duplication_follower, const std::string &parent_dir) { - std::string dir; + dir_node *dn = nullptr; if (parent_dir.empty()) { - dir = get_replica_dir(app.app_type.c_str(), gpid); + dn = _fs_manager.get_replica_dir(app.app_type.c_str(), gpid, true); } else { - dir = get_child_dir(app.app_type.c_str(), gpid, parent_dir); + dn = _fs_manager.get_child_dir(app.app_type.c_str(), gpid, parent_dir); + } + if (dn == nullptr) { + LOG_ERROR("could not allocate a new directory for replica {}", gpid); + return nullptr; } - auto *rep = - new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower); + + auto *rep = new replica(this, gpid, app, dn, restore_if_necessary, is_duplication_follower); error_code err; if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) { LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err); - clear_on_failure(rep, dir, gpid); + clear_on_failure(rep); return nullptr; } @@ -2253,14 +2212,14 @@ replica *replica_stub::new_replica(gpid gpid, "previous detail error log", rep->name(), err); - clear_on_failure(rep, dir, gpid); + clear_on_failure(rep); return nullptr; } err = rep->initialize_on_new(); if (err != ERR_OK) { LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err); - clear_on_failure(rep, dir, gpid); + clear_on_failure(rep); return nullptr; } @@ -2268,7 +2227,7 @@ replica *replica_stub::new_replica(gpid gpid, return rep; } -replica *replica_stub::load_replica(const char *dir) +replica *replica_stub::load_replica(dir_node *dn, const char *dir) { FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; }); @@ -2314,7 +2273,7 @@ replica *replica_stub::load_replica(const char *dir) return nullptr; } - auto *rep = new replica(this, pid, info, dir, false); + auto *rep = new replica(this, pid, info, dn, false); err = rep->initialize_on_load(); if (err != ERR_OK) { LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err); @@ -2336,14 +2295,17 @@ replica *replica_stub::load_replica(const char *dir) return rep; } -void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid) +void replica_stub::clear_on_failure(replica *rep) { + std::string rep_dir = rep->dir(); + auto pid = rep->get_gpid(); + rep->close(); delete rep; rep = nullptr; // clear work on failure - utils::filesystem::remove_path(path); + utils::filesystem::remove_path(rep_dir); _fs_manager.remove_replica(pid); } @@ -2883,44 +2845,6 @@ void replica_stub::close() _is_running = false; } -std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new) -{ - std::string gpid_str = fmt::format("{}.{}", id, app_type); - std::string replica_dir; - bool is_dir_exist = false; - for (const std::string &data_dir : _fs_manager.get_available_data_dirs()) { - std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); - if (utils::filesystem::directory_exists(dir)) { - CHECK(!is_dir_exist, "replica dir conflict: {} <--> {}", dir, replica_dir); - replica_dir = dir; - is_dir_exist = true; - } - } - if (replica_dir.empty() && create_new) { - _fs_manager.allocate_dir(id, app_type, replica_dir); - } - return replica_dir; -} - -std::string -replica_stub::get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir) -{ - std::string gpid_str = fmt::format("{}.{}", child_pid.to_string(), app_type); - std::string child_dir; - for (const std::string &data_dir : _fs_manager.get_available_data_dirs()) { - std::string dir = utils::filesystem::path_combine(data_dir, gpid_str); - // = /. - // check if 's is equal to - if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") { - child_dir = dir; - _fs_manager.add_replica(child_pid, child_dir); - break; - } - } - CHECK(!child_dir.empty(), "can not find parent_dir {} in data_dirs", parent_dir); - return child_dir; -} - #ifdef DSN_ENABLE_GPERF // Get tcmalloc numeric property (name is "prop") value. // Return -1 if get property failed (property we used will be greater than zero) @@ -3010,14 +2934,16 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid, app_info *app, const std::string &parent_dir) { - FAIL_POINT_INJECT_F("replica_stub_create_child_replica_if_not_found", - [=](dsn::string_view) -> replica_ptr { - replica *rep = new replica(this, child_pid, *app, "./", false); - rep->_config.status = partition_status::PS_INACTIVE; - _replicas.insert(replicas::value_type(child_pid, rep)); - LOG_INFO("mock create_child_replica_if_not_found succeed"); - return rep; - }); + FAIL_POINT_INJECT_F( + "replica_stub_create_child_replica_if_not_found", [=](dsn::string_view) -> replica_ptr { + const auto dn = _fs_manager.get_child_dir(app->app_type.c_str(), child_pid, parent_dir); + CHECK_NOTNULL(dn, ""); + replica *rep = new replica(this, child_pid, *app, dn, false); + rep->_config.status = partition_status::PS_INACTIVE; + _replicas.insert(replicas::value_type(child_pid, rep)); + LOG_INFO("mock create_child_replica_if_not_found succeed"); + return rep; + }); zauto_write_lock l(_replicas_lock); auto it = _replicas.find(child_pid); @@ -3094,22 +3020,20 @@ void replica_stub::on_update_child_group_partition_count(update_child_group_part void replica_stub::update_disk_holding_replicas() { - for (const auto &dir_node : _fs_manager._dir_nodes) { - // clear the holding_primary_replicas/holding_secondary_replicas and re-calculate it from - // holding_replicas - dir_node->holding_primary_replicas.clear(); - dir_node->holding_secondary_replicas.clear(); - for (const auto &holding_replicas : dir_node->holding_replicas) { - const std::set &pids = holding_replicas.second; + for (const auto &dn : _fs_manager.get_dir_nodes()) { + dn->holding_primary_replicas.clear(); + dn->holding_secondary_replicas.clear(); + for (const auto &holding_replicas : dn->holding_replicas) { + const auto &pids = holding_replicas.second; for (const auto &pid : pids) { - replica_ptr replica = get_replica(pid); - if (replica == nullptr) { + const auto rep = get_replica(pid); + if (rep == nullptr) { continue; } - if (replica->status() == partition_status::PS_PRIMARY) { - dir_node->holding_primary_replicas[holding_replicas.first].emplace(pid); - } else if (replica->status() == partition_status::PS_SECONDARY) { - dir_node->holding_secondary_replicas[holding_replicas.first].emplace(pid); + if (rep->status() == partition_status::PS_PRIMARY) { + dn->holding_primary_replicas[holding_replicas.first].emplace(pid); + } else if (rep->status() == partition_status::PS_SECONDARY) { + dn->holding_secondary_replicas[holding_replicas.first].emplace(pid); } } } @@ -3199,25 +3123,6 @@ void replica_stub::query_app_manual_compact_status( } } -void replica_stub::update_disks_status() -{ - for (const auto &dir_node : _fs_manager._status_updated_dir_nodes) { - for (const auto &holding_replicas : dir_node->holding_replicas) { - const std::set &pids = holding_replicas.second; - for (const auto &pid : pids) { - replica_ptr replica = get_replica(pid); - if (replica == nullptr) { - continue; - } - replica->set_disk_status(dir_node->status); - LOG_INFO("{} update disk_status to {}", - replica->name(), - enum_to_string(replica->get_disk_status())); - } - } - } -} - void replica_stub::update_config(const std::string &name) { // The new value has been validated and FLAGS_* has been updated, it's safety to use it diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 12f17a1cfb..79a55397f0 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -139,8 +139,6 @@ class replica_stub : public serverlet, public ref_counter // void initialize(const replication_options &opts, bool clear = false); void initialize(bool clear = false); - void initialize_fs_manager(const std::vector &data_dirs, - const std::vector &data_dir_tags); void set_options(const replication_options &opts) { _options = opts; } void open_service(); void close(); @@ -202,12 +200,6 @@ class replica_stub : public serverlet, public ref_counter virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); } rpc_address primary_address() const { return _primary_address; } - std::string get_replica_dir(const char *app_type, gpid id, bool create_new = true); - - // during partition split, we should gurantee child replica and parent replica share the - // same data dir - std::string get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir); - // // helper methods // @@ -341,16 +333,16 @@ class replica_stub : public serverlet, public ref_counter const std::shared_ptr &req, const std::shared_ptr &req2); // Create a new replica according to the parameters. - // 'parent_dir' is used in partition split for get_child_dir(). + // 'parent_dir' is used in partition split to generate child directory. replica *new_replica(gpid gpid, const app_info &app, bool restore_if_necessary, bool is_duplication_follower, const std::string &parent_dir = ""); - // Load an existing replica from 'dir'. - replica *load_replica(const char *dir); + // Load an existing replica which is located in 'dn' with 'dir' directory. + replica *load_replica(dir_node *dn, const char *dir); // Clean up the memory state and on disk data if creating replica failed. - void clear_on_failure(replica *rep, const std::string &path, const gpid &pid); + void clear_on_failure(replica *rep); task_ptr begin_close_replica(replica_ptr r); void close_replica(replica_ptr r); void notify_replica_state_update(const replica_configuration &config, bool is_closing); @@ -372,8 +364,6 @@ class replica_stub : public serverlet, public ref_counter error_code error); void update_disk_holding_replicas(); - void update_disks_status(); - void register_ctrl_command(); int get_app_id_from_replicas(std::string app_name) diff --git a/src/replica/replica_test_utils.cpp b/src/replica/replica_test_utils.cpp deleted file mode 100644 index ce60026c14..0000000000 --- a/src/replica/replica_test_utils.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -#include "replica/replica_test_utils.h" - -#include "common/gpid.h" -#include "replica.h" -#include "replica_stub.h" - -namespace dsn { -class app_info; - -namespace replication { - -class mock_replica : public replica -{ -public: - mock_replica(replica_stub *stub, - const gpid &gpid, - const app_info &app, - const char *dir, - bool restore_if_necessary, - bool is_duplication_follower) - : replica(stub, gpid, app, dir, restore_if_necessary, is_duplication_follower) - { - } -}; - -replica *create_test_replica(replica_stub *stub, - gpid gpid, - const app_info &app, - const char *dir, - bool restore_if_necessary, - bool is_duplication_follower) -{ - return new mock_replica(stub, gpid, app, dir, restore_if_necessary, is_duplication_follower); -} - -replica_stub *create_test_replica_stub() { return new replica_stub(); } - -void destroy_replica(replica *r) { delete r; } - -void destroy_replica_stub(replica_stub *rs) { delete rs; } - -} // namespace replication -} // namespace dsn diff --git a/src/replica/replica_test_utils.h b/src/replica/replica_test_utils.h deleted file mode 100644 index e09d33873b..0000000000 --- a/src/replica/replica_test_utils.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/// This file contains utilities for upper level applications (pegasus) which -/// needs the hidden abstraction of rDSN in order to make unit test. - -namespace dsn { -class app_info; -class gpid; - -namespace replication { - -class replica; -class replica_stub; - -extern replica *create_test_replica(replica_stub *stub, - gpid gpid, - const app_info &app, - const char *dir, - bool restore_if_necessary, - bool is_duplication_follower); - -extern replica_stub *create_test_replica_stub(); - -extern void destroy_replica(replica *r); - -extern void destroy_replica_stub(replica_stub *rs); - -} // namespace replication -} // namespace dsn diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index 5159fd757d..b8bcff404f 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -457,10 +457,12 @@ error_code replication_app_base::apply_mutation(const mutation *mu) // an error. if (!has_ingestion_request) { switch (storage_error) { - // TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage - // engine errors. + // TODO(yingchun): Now only kCorruption and kIOError are dealt, consider to deal with + // more storage engine errors. case rocksdb::Status::kCorruption: return ERR_RDB_CORRUPTION; + case rocksdb::Status::kIOError: + return ERR_RDB_IO_ERROR; default: return ERR_LOCAL_APP_FAILURE; } diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index cece7c82ed..6409c5cf18 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -115,10 +115,10 @@ class mock_replica : public replica mock_replica(replica_stub *stub, gpid gpid, const app_info &app, - const char *dir, + dir_node *dn, bool need_restore = false, bool is_duplication_follower = false) - : replica(stub, gpid, app, dir, need_restore, is_duplication_follower) + : replica(stub, gpid, app, dn, need_restore, is_duplication_follower) { _app = std::make_unique(this); } @@ -231,23 +231,23 @@ class mock_replica : public replica }; typedef dsn::ref_ptr mock_replica_ptr; -inline std::unique_ptr create_mock_replica(replica_stub *stub, - int appid = 1, - int partition_index = 1, - const char *dir = "./") +inline std::unique_ptr +create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1) { gpid gpid(appid, partition_index); app_info app_info; app_info.app_type = "replica"; app_info.app_name = "temp"; - return std::make_unique(stub, gpid, app_info, dir); + dir_node *dn = stub->get_fs_manager()->get_replica_dir(app_info.app_type.c_str(), gpid, true); + CHECK_NOTNULL(dn, ""); + return std::make_unique(stub, gpid, app_info, dn); } class mock_replica_stub : public replica_stub { public: - mock_replica_stub() = default; + mock_replica_stub() { _fs_manager.initialize({"test_dir"}, {"test"}); } ~mock_replica_stub() override = default; @@ -286,15 +286,20 @@ class mock_replica_stub : public replica_stub partition_status::type status = partition_status::PS_INACTIVE, ballot b = 5, bool need_restore = false, - bool is_duplication_follower = false) + bool is_duplication_follower = false, + dir_node *dn = nullptr) { replica_configuration config; config.ballot = b; config.pid = pid; config.status = status; + if (dn == nullptr) { + dn = _fs_manager.get_replica_dir("replica", pid, true); + } + CHECK_NOTNULL(dn, ""); mock_replica_ptr rep = - new mock_replica(this, pid, info, "./", need_restore, is_duplication_follower); + new mock_replica(this, pid, info, dn, need_restore, is_duplication_follower); rep->set_replica_config(config); _replicas[pid] = rep; @@ -313,36 +318,48 @@ class mock_replica_stub : public replica_stub config.pid = pid; config.status = status; - // TODO(yingchun): should refactor to move to cstor or initializer. - initialize_fs_manager({"./"}, {"tag"}); - std::string dir = get_replica_dir("test", pid); - auto *rep = - new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower); + auto dn = _fs_manager.get_replica_dir("test", pid, true); + CHECK_NOTNULL(dn, ""); + auto *rep = new mock_replica(this, pid, info, dn, need_restore, is_duplication_follower); rep->set_replica_config(config); return rep; } void generate_replicas_base_dir_nodes_for_app(app_info mock_app, - int primary_count_for_disk = 1, - int secondary_count_for_disk = 2) + int primary_count_per_disk, + int secondary_count_per_disk) { - const auto &dir_nodes = _fs_manager._dir_nodes; - for (auto &dir_node : dir_nodes) { - const auto &replica_iter = dir_node->holding_replicas.find(mock_app.app_id); - if (replica_iter == dir_node->holding_replicas.end()) { - continue; + int partition_index = 0; + for (const auto &dn : _fs_manager.get_dir_nodes()) { + // Create 'partition_count' count of replicas. + if (partition_index >= mock_app.partition_count) { + break; } - const std::set &pids = replica_iter->second; - int primary_count = primary_count_for_disk; - int secondary_count = secondary_count_for_disk; - for (const gpid &pid : pids) { - // generate primary replica and secondary replica. + int replica_count_per_disk = primary_count_per_disk + secondary_count_per_disk; + int primary_count = primary_count_per_disk; + int secondary_count = secondary_count_per_disk; + // Create 'replica_count_per_disk' count of replicas on 'dn'. + while (replica_count_per_disk-- > 0) { + gpid new_gpid(mock_app.app_id, partition_index++); + _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), new_gpid); if (primary_count-- > 0) { - add_replica(generate_replica_ptr( - mock_app, pid, partition_status::PS_PRIMARY, mock_app.app_id)); + // Create 'primary_count' count of primary replicas on 'dn'. + add_replica(generate_replica_ptr(mock_app, + new_gpid, + partition_status::PS_PRIMARY, + mock_app.app_id, + false, + false, + dn.get())); } else if (secondary_count-- > 0) { - add_replica(generate_replica_ptr( - mock_app, pid, partition_status::PS_SECONDARY, mock_app.app_id)); + // Create 'secondary_count' count of secondary replicas on 'dn'. + add_replica(generate_replica_ptr(mock_app, + new_gpid, + partition_status::PS_SECONDARY, + mock_app.app_id, + false, + false, + dn.get())); } } } diff --git a/src/replica/test/mutation_log_learn_test.cpp b/src/replica/test/mutation_log_learn_test.cpp index 6c2fdf71ac..347451fc68 100644 --- a/src/replica/test/mutation_log_learn_test.cpp +++ b/src/replica/test/mutation_log_learn_test.cpp @@ -63,7 +63,7 @@ class mutation_log_test : public replica_test_base TEST_F(mutation_log_test, learn) { std::chrono::steady_clock clock; - gpid gpid(1, 1); + gpid gpid(1, 0); std::string str = "hello, world!"; std::string logp = _log_dir; diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp index 2b7ce0bab4..6bca13336a 100644 --- a/src/replica/test/mutation_log_test.cpp +++ b/src/replica/test/mutation_log_test.cpp @@ -297,7 +297,6 @@ class mutation_log_test : public replica_test_base { utils::filesystem::remove_path(_log_dir); utils::filesystem::create_directory(_log_dir); - utils::filesystem::remove_path(_log_dir + ".test"); } diff --git a/src/replica/test/open_replica_test.cpp b/src/replica/test/open_replica_test.cpp index 89e0802864..4f7b79d1a4 100644 --- a/src/replica/test/open_replica_test.cpp +++ b/src/replica/test/open_replica_test.cpp @@ -77,7 +77,6 @@ class open_replica_test : public replica_test_base if (!tt.is_in_dir_nodes) { dir_node *node_disk = new dir_node("tag_" + std::to_string(i), "tmp_dir"); stub->_fs_manager._dir_nodes.emplace_back(node_disk); - stub->_fs_manager._available_data_dirs.emplace_back("tmp_dir"); } _replica->register_service(); diff --git a/src/replica/test/replica_disk_migrate_test.cpp b/src/replica/test/replica_disk_migrate_test.cpp index 64ac2da7f3..7804e3210d 100644 --- a/src/replica/test/replica_disk_migrate_test.cpp +++ b/src/replica/test/replica_disk_migrate_test.cpp @@ -174,7 +174,7 @@ TEST_F(replica_disk_migrate_test, migrate_disk_replica_check) auto &request = *fake_migrate_rpc.mutable_request(); auto &response = fake_migrate_rpc.response(); - request.pid = dsn::gpid(app_info_1.app_id, 1); + request.pid = dsn::gpid(app_info_1.app_id, 0); request.origin_disk = "tag_1"; request.target_disk = "tag_2"; @@ -347,7 +347,7 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_update) ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaNewDir)); utils::filesystem::remove_path(fmt::format("./{}/", request.origin_disk)); utils::filesystem::remove_path(fmt::format("./{}/", request.target_disk)); - for (const auto &node_disk : get_dir_nodes()) { + for (const auto &node_disk : stub->get_fs_manager()->get_dir_nodes()) { if (node_disk->tag == request.origin_disk) { auto gpids = node_disk->holding_replicas[app_info_1.app_id]; ASSERT_TRUE(gpids.find(request.pid) == gpids.end()); @@ -362,14 +362,25 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_update) } } +// Test load from new replica dir failed, then fall back to load from origin dir succeed, +// and then mark the "new" replica dir as ".gar". TEST_F(replica_disk_migrate_test, disk_migrate_replica_open) { + gpid test_pid(app_info_1.app_id, 4); + + // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1. auto &request = *fake_migrate_rpc.mutable_request(); - request.pid = dsn::gpid(app_info_1.app_id, 4); + request.pid = test_pid; request.origin_disk = "tag_2"; request.target_disk = "tag_empty_1"; - remove_mock_dir_node(request.origin_disk); + // Remove the gpid 1.4 dir which is created in constructor. + const std::string kReplicaOriginDir = + fmt::format("./{}/{}.replica", request.origin_disk, request.pid.to_string()); + utils::filesystem::remove_path(kReplicaOriginDir); + stub->get_fs_manager()->remove_replica(test_pid); + + // Create the related dirs. const std::string kReplicaOriginSuffixDir = fmt::format( "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, request.pid.to_string()); const std::string kReplicaNewDir = @@ -377,16 +388,18 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_open) utils::filesystem::create_directory(kReplicaOriginSuffixDir); utils::filesystem::create_directory(kReplicaNewDir); + // The replica can be opened nomally. In fact, the original dir is opened, and the new dir will + // be garbage. fail::cfg("mock_replica_load", "return()"); - const std::string kReplicaOriginDir = - fmt::format("./{}/{}.replica", request.origin_disk, request.pid.to_string()); - const std::string kReplicaGarDir = - fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid.to_string()); open_replica(app_info_1, request.pid); + // Check it works as expected. + const std::string kReplicaGarDir = + fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid.to_string()); ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaOriginDir)); ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaGarDir)); + // Clean up. utils::filesystem::remove_path(kReplicaOriginDir); utils::filesystem::remove_path(kReplicaGarDir); } diff --git a/src/replica/test/replica_disk_test.cpp b/src/replica/test/replica_disk_test.cpp index 934a5c7495..0f9b7b2ccb 100644 --- a/src/replica/test/replica_disk_test.cpp +++ b/src/replica/test/replica_disk_test.cpp @@ -92,18 +92,17 @@ TEST_F(replica_disk_test, on_query_disk_info_all_app) ASSERT_EQ(disk_infos.size(), 6); int info_size = disk_infos.size(); - int app_id_1_partition_index = 1; - int app_id_2_partition_index = 1; + int app_id_1_partition_index = 0; + int app_id_2_partition_index = 0; for (int i = 0; i < info_size; i++) { - if (disk_infos[i].tag == "tag_empty_1") { + if (disk_infos[i].holding_primary_replicas.empty() && + disk_infos[i].holding_secondary_replicas.empty()) { continue; } ASSERT_EQ(disk_infos[i].tag, "tag_" + std::to_string(i + 1)); ASSERT_EQ(disk_infos[i].full_dir, "./tag_" + std::to_string(i + 1)); ASSERT_EQ(disk_infos[i].disk_capacity_mb, 500); ASSERT_EQ(disk_infos[i].disk_available_mb, (i + 1) * 50); - // `holding_primary_replicas` and `holding_secondary_replicas` is std::map> ASSERT_EQ(disk_infos[i].holding_primary_replicas.size(), 2); ASSERT_EQ(disk_infos[i].holding_secondary_replicas.size(), 2); @@ -170,24 +169,21 @@ TEST_F(replica_disk_test, on_query_disk_info_one_app) request.app_name = app_info_1.app_name; stub->on_query_disk_info(fake_query_disk_rpc); - auto &disk_infos_with_app_1 = fake_query_disk_rpc.response().disk_infos; - int info_size = disk_infos_with_app_1.size(); - for (int i = 0; i < info_size; i++) { - if (disk_infos_with_app_1[i].tag == "tag_empty_1") { + for (auto disk_info : fake_query_disk_rpc.response().disk_infos) { + if (disk_info.holding_primary_replicas.empty() && + disk_info.holding_secondary_replicas.empty()) { continue; } - // `holding_primary_replicas` and `holding_secondary_replicas` is std::map> - ASSERT_EQ(disk_infos_with_app_1[i].holding_primary_replicas.size(), 1); - ASSERT_EQ(disk_infos_with_app_1[i].holding_secondary_replicas.size(), 1); - ASSERT_EQ(disk_infos_with_app_1[i].holding_primary_replicas[app_info_1.app_id].size(), + ASSERT_EQ(disk_info.holding_primary_replicas.size(), 1); + ASSERT_EQ(disk_info.holding_secondary_replicas.size(), 1); + ASSERT_EQ(disk_info.holding_primary_replicas[app_info_1.app_id].size(), app_id_1_primary_count_for_disk); - ASSERT_EQ(disk_infos_with_app_1[i].holding_secondary_replicas[app_info_1.app_id].size(), + ASSERT_EQ(disk_info.holding_secondary_replicas[app_info_1.app_id].size(), app_id_1_secondary_count_for_disk); - ASSERT_TRUE(disk_infos_with_app_1[i].holding_primary_replicas.find(app_info_2.app_id) == - disk_infos_with_app_1[i].holding_primary_replicas.end()); - ASSERT_TRUE(disk_infos_with_app_1[i].holding_secondary_replicas.find(app_info_2.app_id) == - disk_infos_with_app_1[i].holding_secondary_replicas.end()); + ASSERT_TRUE(disk_info.holding_primary_replicas.find(app_info_2.app_id) == + disk_info.holding_primary_replicas.end()); + ASSERT_TRUE(disk_info.holding_secondary_replicas.find(app_info_2.app_id) == + disk_info.holding_secondary_replicas.end()); } } @@ -215,9 +211,8 @@ TEST_F(replica_disk_test, gc_disk_useless_dir) sleep(5); - std::vector data_dirs{"./"}; disk_cleaning_report report{}; - dsn::replication::disk_remove_useless_dirs(data_dirs, report); + dsn::replication::disk_remove_useless_dirs({std::make_shared("test", "./")}, report); for (const auto &test : tests) { if (!dsn::replication::is_data_dir_removable(test)) { @@ -246,9 +241,8 @@ TEST_F(replica_disk_test, disk_status_test) {disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT}, {disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}}; for (const auto &test : tests) { - auto node = get_dir_nodes()[node_index]; + auto node = stub->get_fs_manager()->get_dir_nodes()[node_index]; mock_node_status(node_index, test.old_status, test.new_status); - update_disks_status(); for (auto &kv : node->holding_replicas) { for (auto &pid : kv.second) { bool flag; diff --git a/src/replica/test/replica_disk_test_base.h b/src/replica/test/replica_disk_test_base.h index db41cd509b..82c1859c1e 100644 --- a/src/replica/test/replica_disk_test_base.h +++ b/src/replica/test/replica_disk_test_base.h @@ -39,16 +39,23 @@ class replica_disk_test_base : public replica_test_base // tag_5 100*5 50*5 50% // total 2500 750 30% // replica info, for example: - // dir_node primary/secondary - // - // tag_empty_1 - // tag_1 1.1 | 1.2,1.3 - // 2.1,2.2 | 2.3,2.4,2.5,2.6 - // - // tag_2 1.4 | 1.5,1.6 - // 2.7,2.8 | 2.9,2.10,2.11,2.12,2.13 - // ... - // ... + // dir_node primary | secondary + // --------------------------------+--------------------------- + // tag_1 1.0 | 1.1 1.2 + // 2.0 2.1 | 2.2 2.3 2.4 2.5 + // --------------------------------+--------------------------- + // tag_2 1.3 | 1.4 1.5 + // 2.6 2.7 | 2.8 2.9 2.10 2.11 + // --------------------------------+--------------------------- + // tag_3 1.6 | 1.7 1.8 + // 2.12 2.13 | 2.14 2.15 2.16 2.17 + // --------------------------------+--------------------------- + // tag_4 | + // --------------------------------+--------------------------- + // tag_5 | + // --------------------------------+--------------------------- + // tag_empty_1 | + // --------------------------------+--------------------------- replica_disk_test_base() { fail::setup(); @@ -56,6 +63,8 @@ class replica_disk_test_base : public replica_test_base fail::cfg("update_disk_stat", "return()"); generate_mock_app_info(); + stub->_fs_manager._dir_nodes.clear(); + stub->_fs_manager.reset_disk_stat(); generate_mock_dir_nodes(dir_nodes_count); generate_mock_empty_dir_node(empty_dir_nodes_count); @@ -69,12 +78,6 @@ class replica_disk_test_base : public replica_test_base ~replica_disk_test_base() { fail::teardown(); } - void update_disk_replica() { stub->on_disk_stat(); } - - void update_disks_status() { stub->update_disks_status(); } - - std::vector> get_dir_nodes() { return stub->_fs_manager._dir_nodes; } - void generate_mock_dir_node(const app_info &app, const gpid pid, const std::string &tag, @@ -83,7 +86,6 @@ class replica_disk_test_base : public replica_test_base dir_node *node_disk = new dir_node(tag, full_dir); node_disk->holding_replicas[app.app_id].emplace(pid); stub->_fs_manager._dir_nodes.emplace_back(node_disk); - stub->_fs_manager._available_data_dirs.emplace_back(full_dir); } void remove_mock_dir_node(const std::string &tag) @@ -101,16 +103,14 @@ class replica_disk_test_base : public replica_test_base void mock_node_status(int32_t node_index, disk_status::type old_status, disk_status::type new_status) { - auto node = get_dir_nodes()[node_index]; + auto node = stub->_fs_manager.get_dir_nodes()[node_index]; for (const auto &kv : node->holding_replicas) { for (const auto &pid : kv.second) { update_replica_disk_status(pid, old_status); } } - stub->_fs_manager._status_updated_dir_nodes.clear(); if (old_status != new_status) { node->status = new_status; - stub->_fs_manager._status_updated_dir_nodes.emplace_back(node); } } @@ -120,22 +120,24 @@ class replica_disk_test_base : public replica_test_base if (replica == nullptr) { return ERR_OBJECT_NOT_FOUND; } - flag = replica->disk_space_insufficient(); + flag = (replica->get_dir_node()->status == disk_status::SPACE_INSUFFICIENT); return ERR_OK; } int32_t ignore_broken_disk_test(const std::string &mock_create_directory, const std::string &mock_check_rw) { - std::vector data_dirs = {"disk1", "disk2", "disk3"}; - std::vector data_dir_tags = {"tag1", "tag2", "tag3"}; - auto test_stub = std::make_unique(); fail::cfg("filesystem_create_directory", "return(" + mock_create_directory + ")"); fail::cfg("filesystem_check_dir_rw", "return(" + mock_check_rw + ")"); fail::cfg("update_disk_stat", "return()"); - test_stub->initialize_fs_manager(data_dirs, data_dir_tags); - int32_t dir_size = test_stub->_fs_manager.get_available_data_dirs().size(); - test_stub.reset(); + fs_manager fm; + fm.initialize({"disk1", "disk2", "disk3"}, {"tag1", "tag2", "tag3"}); + int32_t dir_size = 0; + for (const auto &dn : fm.get_dir_nodes()) { + if (dn->status == disk_status::NORMAL) { + dir_size++; + } + } return dir_size; } @@ -153,7 +155,6 @@ class replica_disk_test_base : public replica_test_base void reset_after_add_new_disk_test() { stub->_fs_manager._dir_nodes.clear(); - stub->_fs_manager._available_data_dirs.clear(); dsn::utils::filesystem::remove_path("add_new_not_empty_disk"); } @@ -193,7 +194,6 @@ class replica_disk_test_base : public replica_test_base dir_node *node_disk = new dir_node(fmt::format("tag_empty_{}", num), fmt::format("./tag_empty_{}", num)); stub->_fs_manager._dir_nodes.emplace_back(node_disk); - stub->_fs_manager._available_data_dirs.emplace_back(node_disk->full_dir); utils::filesystem::create_directory(node_disk->full_dir); num--; } @@ -201,14 +201,6 @@ class replica_disk_test_base : public replica_test_base void generate_mock_dir_nodes(int num) { - int app_id_1_disk_holding_replica_count = - app_id_1_primary_count_for_disk + app_id_1_secondary_count_for_disk; - int app_id_2_disk_holding_replica_count = - app_id_2_primary_count_for_disk + app_id_2_secondary_count_for_disk; - - int app_id_1_partition_index = 1; - int app_id_2_partition_index = 1; - int64_t disk_capacity_mb = num * 100; int count = 0; while (count++ < num) { @@ -226,20 +218,7 @@ class replica_disk_test_base : public replica_test_base node_disk->full_dir); // open replica need the options utils::filesystem::create_directory(node_disk->full_dir); - int app_1_replica_count_per_disk = app_id_1_disk_holding_replica_count; - while (app_1_replica_count_per_disk-- > 0) { - node_disk->holding_replicas[app_info_1.app_id].emplace( - gpid(app_info_1.app_id, app_id_1_partition_index++)); - } - - int app_2_replica_count_per_disk = app_id_2_disk_holding_replica_count; - while (app_2_replica_count_per_disk-- > 0) { - node_disk->holding_replicas[app_info_2.app_id].emplace( - gpid(app_info_2.app_id, app_id_2_partition_index++)); - } - stub->_fs_manager._dir_nodes.emplace_back(node_disk); - stub->_fs_manager._available_data_dirs.emplace_back(node_disk->full_dir); } } @@ -249,7 +228,6 @@ class replica_disk_test_base : public replica_test_base if (replica == nullptr) { return; } - replica->set_disk_status(status); } }; diff --git a/src/replica/test/replica_learn_test.cpp b/src/replica/test/replica_learn_test.cpp index 5f937e24e4..49f0c3d2cd 100644 --- a/src/replica/test/replica_learn_test.cpp +++ b/src/replica/test/replica_learn_test.cpp @@ -48,7 +48,11 @@ class replica_learn_test : public duplication_test_base app_info app_info; app_info.app_type = "replica"; app_info.duplicating = true; - auto r = std::make_unique(stub.get(), gpid, app_info, "./"); + + dir_node *dn = + stub->get_fs_manager()->get_replica_dir(app_info.app_type.c_str(), gpid, true); + CHECK_NOTNULL(dn, ""); + auto r = std::make_unique(stub.get(), gpid, app_info, dn); r->as_primary(); return r; } diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index 8e09a8e033..7d49a06186 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -61,6 +61,7 @@ #include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/string_conv.h" +#include "utils/test_macros.h" namespace dsn { namespace replication { @@ -462,7 +463,7 @@ TEST_F(replica_test, test_query_last_checkpoint_info) _mock_replica->set_last_committed_decree(200); _mock_replica->on_query_last_checkpoint(resp); ASSERT_EQ(resp.last_committed_decree, 200); - ASSERT_EQ(resp.base_local_dir, "./data/checkpoint.100"); + ASSERT_STR_CONTAINS(resp.base_local_dir, "/data/checkpoint.100"); } TEST_F(replica_test, test_clear_on_failure) @@ -476,7 +477,7 @@ TEST_F(replica_test, test_clear_on_failure) dsn::utils::filesystem::create_directory(path); ASSERT_TRUE(has_gpid(pid)); - stub->clear_on_failure(rep, path, pid); + stub->clear_on_failure(rep); ASSERT_FALSE(dsn::utils::filesystem::path_exists(path)); ASSERT_FALSE(has_gpid(pid)); diff --git a/src/replica/test/replica_test_base.h b/src/replica/test/replica_test_base.h index 94260dbce4..21be0ae5b9 100644 --- a/src/replica/test/replica_test_base.h +++ b/src/replica/test/replica_test_base.h @@ -53,9 +53,14 @@ class replica_test_base : public replica_stub_test_base { public: std::unique_ptr _replica; - const std::string _log_dir{"./test-log"}; + // TODO(yingchun): rename to _replica_dir, and consider to remove it totally. + std::string _log_dir; - replica_test_base() { _replica = create_mock_replica(stub.get(), 1, 1, _log_dir.c_str()); } + replica_test_base() + { + _replica = create_mock_replica(stub.get(), 1, 0); + _log_dir = _replica->dir(); + } virtual mutation_ptr create_test_mutation(int64_t decree, const std::string &data) { diff --git a/src/server/test/hotkey_collector_test.cpp b/src/server/test/hotkey_collector_test.cpp index c331b3cac5..e1f608ee4c 100644 --- a/src/server/test/hotkey_collector_test.cpp +++ b/src/server/test/hotkey_collector_test.cpp @@ -221,7 +221,7 @@ TEST_F(fine_collector_test, fine_collector) class hotkey_collector_test : public pegasus_server_test_base { public: - hotkey_collector_test() { start(); } + hotkey_collector_test() { CHECK_EQ(::dsn::ERR_OK, start()); } std::shared_ptr get_read_collector() { diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h index 92fcec9253..331938981d 100644 --- a/src/server/test/pegasus_server_test_base.h +++ b/src/server/test/pegasus_server_test_base.h @@ -23,7 +23,8 @@ #include #include -#include "replica/replica_test_utils.h" +#include "common/fs_manager.h" +#include "replica/replica_stub.h" #include "utils/filesystem.h" namespace pegasus { @@ -44,15 +45,22 @@ class pegasus_server_test_base : public ::testing::Test pegasus_server_test_base() { // Remove rdb to prevent rocksdb recovery from last test. - dsn::utils::filesystem::remove_path("./data/rdb"); - _replica_stub = dsn::replication::create_test_replica_stub(); + dsn::utils::filesystem::remove_path("./test_dir"); + _replica_stub = new dsn::replication::replica_stub(); + _replica_stub->get_fs_manager()->initialize({"test_dir"}, {"test_tag"}); _gpid = dsn::gpid(100, 1); dsn::app_info app_info; app_info.app_type = "pegasus"; - _replica = dsn::replication::create_test_replica( - _replica_stub, _gpid, app_info, "./", false, false); + auto *dn = _replica_stub->get_fs_manager()->get_replica_dir( + app_info.app_type.c_str(), _gpid, true); + CHECK_NOTNULL(dn, ""); + _replica = new dsn::replication::replica(_replica_stub, _gpid, app_info, dn, false, false); + const auto dir_data = dsn::utils::filesystem::path_combine(_replica->dir(), "data"); + CHECK(dsn::utils::filesystem::create_directory(dir_data), + "create data dir {} failed", + dir_data); _server = std::make_unique(_replica); } @@ -77,14 +85,14 @@ class pegasus_server_test_base : public ::testing::Test // do not clear state _server->stop(false); - dsn::replication::destroy_replica_stub(_replica_stub); - dsn::replication::destroy_replica(_replica); + delete _replica_stub; + delete _replica; } protected: std::unique_ptr _server; - dsn::replication::replica *_replica; - dsn::replication::replica_stub *_replica_stub; + dsn::replication::replica *_replica = nullptr; + dsn::replication::replica_stub *_replica_stub = nullptr; dsn::gpid _gpid; }; diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp index b4bf0b0ffa..d1a0143133 100644 --- a/src/server/test/rocksdb_wrapper_test.cpp +++ b/src/server/test/rocksdb_wrapper_test.cpp @@ -25,12 +25,12 @@ #include #include +#include "common/fs_manager.h" #include "dsn.layer2_types.h" #include "pegasus_key_schema.h" #include "pegasus_server_test_base.h" #include "pegasus_utils.h" #include "pegasus_value_schema.h" -#include "replica/replica_test_utils.h" #include "server/pegasus_server_write.h" #include "server/pegasus_write_service.h" #include "server/pegasus_write_service_impl.h" @@ -48,9 +48,11 @@ class rocksdb_wrapper_test : public pegasus_server_test_base dsn::blob _raw_key; public: + rocksdb_wrapper_test() = default; + void SetUp() override { - start(); + ASSERT_EQ(::dsn::ERR_OK, start()); _server_write = std::make_unique(_server.get()); _rocksdb_wrapper = _server_write->_write_svc->_impl->_rocksdb_wrapper.get(); @@ -74,13 +76,20 @@ class rocksdb_wrapper_test : public pegasus_server_test_base void set_app_duplicating() { _server->stop(false); - dsn::replication::destroy_replica(_replica); + delete _replica; dsn::app_info app_info; app_info.app_type = "pegasus"; app_info.duplicating = true; - _replica = dsn::replication::create_test_replica( - _replica_stub, _gpid, app_info, "./", false, false); + + auto *dn = _replica_stub->get_fs_manager()->get_replica_dir( + app_info.app_type.c_str(), _gpid, true); + CHECK_NOTNULL(dn, ""); + _replica = new dsn::replication::replica(_replica_stub, _gpid, app_info, dn, false, false); + const auto dir_data = dsn::utils::filesystem::path_combine(_replica->dir(), "data"); + CHECK(dsn::utils::filesystem::create_directory(dir_data), + "create data dir {} failed", + dir_data); _server = std::make_unique(_replica); SetUp(); diff --git a/src/utils/error_code.h b/src/utils/error_code.h index 998b218728..3a699eb4ca 100644 --- a/src/utils/error_code.h +++ b/src/utils/error_code.h @@ -174,4 +174,5 @@ DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL) DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE) DEFINE_ERR_CODE(ERR_RDB_CORRUPTION) +DEFINE_ERR_CODE(ERR_RDB_IO_ERROR) } // namespace dsn diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp index ec4b28a737..6502ec62c1 100644 --- a/src/utils/filesystem.cpp +++ b/src/utils/filesystem.cpp @@ -94,6 +94,7 @@ static inline int get_stat_internal(const std::string &npath, struct stat_ &st) return err; } +// TODO(yingchun): remove the return value since it's always 0. int get_normalized_path(const std::string &path, std::string &npath) { char sep; diff --git a/src/utils/test_macros.h b/src/utils/test_macros.h index 6f9c889733..c1c9c3fd49 100644 --- a/src/utils/test_macros.h +++ b/src/utils/test_macros.h @@ -33,3 +33,9 @@ return; \ } \ } while (0) + +// Substring matches. +#define ASSERT_STR_CONTAINS(str, substr) ASSERT_THAT(str, testing::HasSubstr(substr)) + +#define ASSERT_STR_NOT_CONTAINS(str, substr) \ + ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))