Skip to content

Commit

Permalink
refactor: improve the single-responsibility of class fs_manager (#1477)
Browse files Browse the repository at this point in the history
#1383

This patch moves some functions to fs_manager which are more reasonable to be
responsibilities of class fs_manager rather than those of class replica_stub.
  • Loading branch information
acelyc111 authored May 25, 2023
1 parent ccd01c5 commit d5f2259
Show file tree
Hide file tree
Showing 17 changed files with 507 additions and 331 deletions.
194 changes: 151 additions & 43 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@

#include "fs_manager.h"

#include <stdio.h>
#include <algorithm>
#include <cmath>
#include <iosfwd>
#include <utility>

#include "common/gpid.h"
#include "common/replication_enums.h"
#include "fmt/core.h"
#include "fmt/ostream.h"
#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
Expand All @@ -62,6 +64,11 @@ DSN_DEFINE_int32(replication,
"space insufficient");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);

DSN_DEFINE_bool(replication,
ignore_broken_disk,
true,
"true means ignore broken data disk when initialize");

uint64_t dir_node::replicas_count() const
{
uint64_t sum = 0;
Expand All @@ -80,6 +87,11 @@ uint64_t dir_node::replicas_count(app_id id) const
return iter->second.size();
}

std::string dir_node::replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const
{
return utils::filesystem::path_combine(full_dir, fmt::format("{}.{}", pid, app_type));
}

bool dir_node::has(const gpid &pid) const
{
auto iter = holding_replicas.find(pid.get_app_id());
Expand Down Expand Up @@ -187,15 +199,44 @@ void fs_manager::initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags)
{
CHECK_EQ(data_dirs.size(), data_dir_tags.size());
for (unsigned i = 0; i < data_dirs.size(); ++i) {

// Skip the data directories which are broken.
std::vector<std::shared_ptr<dir_node>> dir_nodes;
for (auto i = 0; i < data_dir_tags.size(); ++i) {
const auto &dir_tag = data_dir_tags[i];
const auto &dir = data_dirs[i];

// Check the status of this directory.
std::string cdir;
std::string err_msg;
if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
!utils::filesystem::check_dir_rw(dir, err_msg))) {
if (FLAGS_ignore_broken_disk) {
LOG_ERROR("data dir({}) is broken, ignore it, error: {}", dir, err_msg);
} else {
CHECK(false, err_msg);
}
// TODO(yingchun): Remove the 'continue' and mark its io error status, regardless
// the status of the disks, add all disks.
continue;
}

// Normalize the data directories.
std::string norm_path;
utils::filesystem::get_normalized_path(data_dirs[i], norm_path);
dir_node *n = new dir_node(data_dir_tags[i], norm_path);
_dir_nodes.emplace_back(n);
LOG_INFO(
"{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]);
utils::filesystem::get_normalized_path(cdir, norm_path);

// Create and add this dir_node.
auto dn = std::make_shared<dir_node>(dir_tag, norm_path);
dir_nodes.emplace_back(dn);
LOG_INFO("mark data dir({}) as tag({})", norm_path, dir_tag);
}
CHECK_FALSE(dir_nodes.empty());

// Update the memory state.
{
zauto_read_lock l(_lock);
_dir_nodes.swap(dir_nodes);
}
_available_data_dirs = data_dirs;

// Update the disk statistics.
update_disk_stat();
Expand Down Expand Up @@ -236,44 +277,40 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
}

void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir)
dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
{
char buffer[256];
sprintf(buffer, "%d.%d.%s", pid.get_app_id(), pid.get_partition_index(), type.c_str());

zauto_write_lock l(_lock);

dir_node *selected = nullptr;

unsigned least_app_replicas_count = 0;
unsigned least_total_replicas_count = 0;

for (auto &n : _dir_nodes) {
CHECK(!n->has(pid), "gpid({}) already in dir_node({})", pid, n->tag);
unsigned app_replicas = n->replicas_count(pid.get_app_id());
unsigned total_replicas = n->replicas_count();

if (selected == nullptr || least_app_replicas_count > app_replicas) {
least_app_replicas_count = app_replicas;
least_total_replicas_count = total_replicas;
selected = n.get();
} else if (least_app_replicas_count == app_replicas &&
least_total_replicas_count > total_replicas) {
least_total_replicas_count = total_replicas;
selected = n.get();
uint64_t least_app_replicas_count = 0;
uint64_t least_total_replicas_count = 0;
{
zauto_write_lock l(_lock);
// Try to find the dir_node with the least replica count.
for (const auto &dn : _dir_nodes) {
CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
uint64_t total_replicas_count = dn->replicas_count();

if (selected == nullptr || least_app_replicas_count > app_replicas_count) {
least_app_replicas_count = app_replicas_count;
least_total_replicas_count = total_replicas_count;
selected = dn.get();
} else if (least_app_replicas_count == app_replicas_count &&
least_total_replicas_count > total_replicas_count) {
least_total_replicas_count = total_replicas_count;
selected = dn.get();
}
}
}

LOG_INFO(
"{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
dsn_primary_address(),
pid,
selected->tag,
least_app_replicas_count,
least_total_replicas_count);

selected->holding_replicas[pid.get_app_id()].emplace(pid);
dir = utils::filesystem::path_combine(selected->full_dir, buffer);
if (selected != nullptr) {
LOG_INFO(
"{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
dsn_primary_address(),
pid,
selected->tag,
least_app_replicas_count,
least_total_replicas_count);
}
return selected;
}

void fs_manager::remove_replica(const gpid &pid)
Expand Down Expand Up @@ -342,7 +379,6 @@ void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string
utils::filesystem::get_normalized_path(data_dir, norm_path);
dir_node *n = new dir_node(tag, norm_path);
_dir_nodes.emplace_back(n);
_available_data_dirs.emplace_back(data_dir);
LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
}

Expand All @@ -359,5 +395,77 @@ bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::s
return false;
}

dir_node *fs_manager::find_replica_dir(dsn::string_view app_type, gpid pid)
{
std::string replica_dir;
dir_node *replica_dn = nullptr;
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
const auto dir = dn->replica_dir(app_type, pid);
if (utils::filesystem::directory_exists(dir)) {
// Check if there are duplicate replica instance directories.
CHECK(replica_dir.empty(), "replica dir conflict: {} <--> {}", dir, replica_dir);
replica_dir = dir;
replica_dn = dn.get();
}
}
}

return replica_dn;
}

dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid)
{
// Try to find the replica directory.
auto replica_dn = find_replica_dir(app_type, pid);
if (replica_dn != nullptr) {
return replica_dn;
}

// Find a dir_node for the new replica.
replica_dn = find_best_dir_for_new_replica(pid);
if (replica_dn == nullptr) {
return nullptr;
}

const auto dir = replica_dn->replica_dir(app_type, pid);
if (!dsn::utils::filesystem::create_directory(dir)) {
LOG_ERROR("create replica directory({}) failed", dir);
return nullptr;
}

replica_dn->holding_replicas[pid.get_app_id()].emplace(pid);
return replica_dn;
}

dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
gpid child_pid,
const std::string &parent_dir)
{
dir_node *child_dn = nullptr;
std::string child_dir;
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
child_dir = dn->replica_dir(app_type, child_pid);
// <parent_dir> = <prefix>/<gpid>.<app_type>
// check if <parent_dir>'s <prefix> is equal to <data_dir>
// TODO(yingchun): use a function instead.
if (parent_dir.substr(0, dn->full_dir.size() + 1) == dn->full_dir + "/") {
child_dn = dn.get();
break;
}
}
}
CHECK_NOTNULL(child_dn, "can not find parent_dir {} in data_dirs", parent_dir);
if (!dsn::utils::filesystem::create_directory(child_dir)) {
LOG_ERROR("create child replica directory({}) failed", child_dir);
return nullptr;
}
add_replica(child_pid, child_dir);
return child_dn;
}

} // namespace replication
} // namespace dsn
34 changes: 26 additions & 8 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "perf_counter/perf_counter_wrapper.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/string_view.h"
#include "utils/zlocks.h"

namespace dsn {
Expand Down Expand Up @@ -72,6 +73,9 @@ struct dir_node
// and protected by the lock in fs_manager.
uint64_t replicas_count(app_id id) const;
uint64_t replicas_count() const;
// Construct the replica dir for the given 'app_type' and 'pid'.
// NOTE: Just construct the string, the directory will not be created.
std::string replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const;
bool has(const dsn::gpid &pid) const;
uint64_t remove(const dsn::gpid &pid);
bool update_disk_stat(const bool update_disk_status);
Expand All @@ -86,23 +90,36 @@ class fs_manager
// NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order.
void initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags);

// Try to find the best dir_node to place the new replica. The less replica count the
// dir_node has, which means the load is lower generally, the higher opportunity it
// will be selected.
// TODO(yingchun): consider the disk capacity and available space.
// NOTE: the 'pid' must not exist in any dir_nodes.
dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
void allocate_dir(const dsn::gpid &pid,
const std::string &type,
/*out*/ std::string &dir);
void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
// Find the replica instance directory.
dir_node *find_replica_dir(dsn::string_view app_type, gpid pid);
// Similar to the above, but it will create a new directory if not found.
dir_node *create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid);
// Similar to the above, and will create a directory for the child on the same dir_node
// of parent.
// During partition split, we should guarantee child replica and parent replica share the
// same data dir.
dir_node *create_child_replica_dir(dsn::string_view app_type,
gpid child_pid,
const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat(bool check_status_changed = true);

void add_new_dir_node(const std::string &data_dir, const std::string &tag);
bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::string> &get_available_data_dirs() const
const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
{
zauto_read_lock l(_lock);
return _available_data_dirs;
return _dir_nodes;
}
bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;

private:
void reset_disk_stat()
Expand All @@ -128,7 +145,6 @@ class fs_manager
int _max_available_ratio = 0;

std::vector<std::shared_ptr<dir_node>> _dir_nodes;
std::vector<std::string> _available_data_dirs;

// Used for disk available space check
// disk status will be updated periodically, this vector record nodes whose disk_status changed
Expand All @@ -147,7 +163,9 @@ class fs_manager
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
FRIEND_TEST(fs_manager, find_best_dir_for_new_replica);
FRIEND_TEST(fs_manager, get_dir_node);
FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
Expand Down
Loading

0 comments on commit d5f2259

Please sign in to comment.