Skip to content

Commit

Permalink
refactor: minor refactor on class fs_manager (#1476)
Browse files Browse the repository at this point in the history
#1383

This is a minor refactor work on class fs_manager, including:
- use `uint64_t` instead of `unsigned` in fs_manager module.
- remove useless "test" parameters.
  • Loading branch information
acelyc111 authored May 17, 2023
1 parent 48fd4e8 commit 6b9fba3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 84 deletions.
135 changes: 71 additions & 64 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,36 +61,39 @@ DSN_DEFINE_int32(replication,
"space insufficient");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);

unsigned dir_node::replicas_count() const
uint64_t dir_node::replicas_count() const
{
unsigned sum = 0;
uint64_t sum = 0;
for (const auto &s : holding_replicas) {
sum += s.second.size();
}
return sum;
}

unsigned dir_node::replicas_count(app_id id) const
uint64_t dir_node::replicas_count(app_id id) const
{
const auto iter = holding_replicas.find(id);
if (iter == holding_replicas.end())
if (iter == holding_replicas.end()) {
return 0;
}
return iter->second.size();
}

bool dir_node::has(const gpid &pid) const
{
auto iter = holding_replicas.find(pid.get_app_id());
if (iter == holding_replicas.end())
if (iter == holding_replicas.end()) {
return false;
}
return iter->second.find(pid) != iter->second.end();
}

unsigned dir_node::remove(const gpid &pid)
uint64_t dir_node::remove(const gpid &pid)
{
auto iter = holding_replicas.find(pid.get_app_id());
if (iter == holding_replicas.end())
if (iter == holding_replicas.end()) {
return 0;
}
return iter->second.erase(pid);
}

Expand Down Expand Up @@ -134,68 +137,67 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
return (old_status != new_status);
}

fs_manager::fs_manager(bool for_test)
fs_manager::fs_manager()
{
if (!for_test) {
_counter_total_capacity_mb.init_app_counter("eon.replica_stub",
"disk.capacity.total(MB)",
_counter_total_capacity_mb.init_app_counter("eon.replica_stub",
"disk.capacity.total(MB)",
COUNTER_TYPE_NUMBER,
"total disk capacity in MB");
_counter_total_available_mb.init_app_counter("eon.replica_stub",
"disk.available.total(MB)",
COUNTER_TYPE_NUMBER,
"total disk available in MB");
_counter_total_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.total.ratio",
COUNTER_TYPE_NUMBER,
"total disk capacity in MB");
_counter_total_available_mb.init_app_counter("eon.replica_stub",
"disk.available.total(MB)",
COUNTER_TYPE_NUMBER,
"total disk available in MB");
_counter_total_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.total.ratio",
COUNTER_TYPE_NUMBER,
"total disk available ratio");
_counter_min_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.min.ratio",
COUNTER_TYPE_NUMBER,
"minimal disk available ratio in all disks");
_counter_max_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.max.ratio",
COUNTER_TYPE_NUMBER,
"maximal disk available ratio in all disks");
}
"total disk available ratio");
_counter_min_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.min.ratio",
COUNTER_TYPE_NUMBER,
"minimal disk available ratio in all disks");
_counter_max_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.max.ratio",
COUNTER_TYPE_NUMBER,
"maximal disk available ratio in all disks");
}

dir_node *fs_manager::get_dir_node(const std::string &subdir) const
{
zauto_read_lock l(_lock);
std::string norm_subdir;
utils::filesystem::get_normalized_path(subdir, norm_subdir);
for (auto &n : _dir_nodes) {
// if input is a subdir of some dir_nodes
const std::string &d = n->full_dir;
if (norm_subdir.compare(0, d.size(), d) == 0 &&
(norm_subdir.size() == d.size() || norm_subdir[d.size()] == '/')) {
return n.get();

zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Check if 'subdir' is a sub-directory of 'dn'.
const std::string &full_dir = dn->full_dir;
if (full_dir.size() > norm_subdir.size()) {
continue;
}

if ((norm_subdir.size() == full_dir.size() || norm_subdir[full_dir.size()] == '/') &&
norm_subdir.compare(0, full_dir.size(), full_dir) == 0) {
return dn.get();
}
}
return nullptr;
}

// size of the two vectors should be equal
dsn::error_code fs_manager::initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &tags,
bool for_test)
void fs_manager::initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags)
{
// create all dir_nodes
CHECK_EQ(data_dirs.size(), tags.size());
CHECK_EQ(data_dirs.size(), data_dir_tags.size());
for (unsigned i = 0; i < data_dirs.size(); ++i) {
std::string norm_path;
utils::filesystem::get_normalized_path(data_dirs[i], norm_path);
dir_node *n = new dir_node(tags[i], norm_path);
dir_node *n = new dir_node(data_dir_tags[i], norm_path);
_dir_nodes.emplace_back(n);
LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, tags[i]);
LOG_INFO(
"{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]);
}
_available_data_dirs = data_dirs;

if (!for_test) {
update_disk_stat(false);
}
return dsn::ERR_OK;
// Update the disk statistics.
update_disk_stat();
}

dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
Expand All @@ -211,21 +213,26 @@ dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &ta

void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
{
dir_node *n = get_dir_node(pid_dir);
if (nullptr == n) {
const auto &dn = get_dir_node(pid_dir);
if (dsn_unlikely(nullptr == dn)) {
LOG_ERROR(
"{}: dir({}) of gpid({}) haven't registered", dsn_primary_address(), pid_dir, pid);
} else {
return;
}

bool emplace_success = false;
{
zauto_write_lock l(_lock);
std::set<dsn::gpid> &replicas_for_app = n->holding_replicas[pid.get_app_id()];
auto result = replicas_for_app.emplace(pid);
if (!result.second) {
LOG_WARNING(
"{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, n->tag);
} else {
LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, n->tag);
}
auto &replicas_for_app = dn->holding_replicas[pid.get_app_id()];
emplace_success = replicas_for_app.emplace(pid).second;
}
if (!emplace_success) {
LOG_WARNING(
"{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, dn->tag);
return;
}

LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
}

void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir)
Expand Down Expand Up @@ -271,16 +278,16 @@ void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/
void fs_manager::remove_replica(const gpid &pid)
{
zauto_write_lock l(_lock);
unsigned remove_count = 0;
for (auto &n : _dir_nodes) {
unsigned r = n->remove(pid);
uint64_t remove_count = 0;
for (auto &dn : _dir_nodes) {
uint64_t r = dn->remove(pid);
CHECK_LE_MSG(remove_count + r,
1,
"gpid({}) found in dir({}), which was removed before",
pid,
n->tag);
dn->tag);
if (r != 0) {
LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, n->tag);
LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, dn->tag);
}
remove_count += r;
}
Expand Down
28 changes: 14 additions & 14 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

#pragma once

#include <gtest/gtest_prod.h>
#include <stdint.h>
#include <functional>
#include <gtest/gtest_prod.h>
#include <map>
#include <memory>
#include <set>
Expand All @@ -37,15 +37,14 @@ namespace dsn {
class gpid;

namespace replication {
class replication_options;

DSN_DECLARE_int32(disk_min_available_space_ratio);

struct dir_node
{
public:
std::string tag;
std::string full_dir;
const std::string tag;
const std::string full_dir;
int64_t disk_capacity_mb;
int64_t disk_available_mb;
int disk_available_ratio;
Expand All @@ -69,24 +68,24 @@ struct dir_node
status(status_)
{
}
unsigned replicas_count(app_id id) const;
unsigned replicas_count() const;
// All functions are not thread-safe. However, they are only used in fs_manager
// and protected by the lock in fs_manager.
uint64_t replicas_count(app_id id) const;
uint64_t replicas_count() const;
bool has(const dsn::gpid &pid) const;
unsigned remove(const dsn::gpid &pid);
uint64_t remove(const dsn::gpid &pid);
bool update_disk_stat(const bool update_disk_status);
};

class fs_manager
{
public:
fs_manager(bool for_test);
~fs_manager() {}
fs_manager();

// this should be called before open/load any replicas
dsn::error_code initialize(const replication_options &opts);
dsn::error_code initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &tags,
bool for_test);
// Should be called before open/load any replicas.
// NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order.
void initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags);

dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
void allocate_dir(const dsn::gpid &pid,
Expand Down Expand Up @@ -148,6 +147,7 @@ class fs_manager
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
FRIEND_TEST(fs_manager, get_dir_node);
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
Expand Down
17 changes: 17 additions & 0 deletions src/common/test/fs_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,22 @@ TEST(fs_manager, dir_update_disk_status)
}
}

TEST(fs_manager, get_dir_node)
{
fs_manager fm;
fm.initialize({"/data1"}, {"data1"});

ASSERT_EQ(nullptr, fm.get_dir_node(""));
ASSERT_EQ(nullptr, fm.get_dir_node("/"));

ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));

ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
}

} // namespace replication
} // namespace dsn
4 changes: 2 additions & 2 deletions src/meta/test/misc/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ void generate_node_fs_manager(const app_mapper &apps,
for (const auto &kv : nodes) {
const node_state &ns = kv.second;
if (nfm.find(ns.addr()) == nfm.end()) {
nfm.emplace(ns.addr(), std::make_shared<fs_manager>(true));
nfm.emplace(ns.addr(), std::make_shared<fs_manager>());
}
fs_manager &manager = *(nfm.find(ns.addr())->second);
manager.initialize(data_dirs, tags, true);
manager.initialize(data_dirs, tags);
ns.for_each_partition([&](const dsn::gpid &pid) {
const config_context &cc = *get_config_context(apps, pid);
snprintf(pid_dir,
Expand Down
5 changes: 1 addition & 4 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_mem_release_max_reserved_mem_percentage(10),
_max_concurrent_bulk_load_downloading_count(5),
_learn_app_concurrent_count(0),
_fs_manager(false),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
_is_running(false)
Expand Down Expand Up @@ -858,9 +857,7 @@ void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_di

CHECK_GT_MSG(
available_dirs.size(), 0, "initialize fs manager failed, no available data directory");
CHECK_EQ_MSG(_fs_manager.initialize(available_dirs, available_dir_tags, false),
dsn::ERR_OK,
"initialize fs manager failed");
_fs_manager.initialize(available_dirs, available_dir_tags);
}

void replica_stub::initialize_start()
Expand Down

0 comments on commit 6b9fba3

Please sign in to comment.