Skip to content

Commit

Permalink
refactor: update replica's dir_node status (part1) (#1487)
Browse files Browse the repository at this point in the history
#1383

In prior implemention, every replica has a "dir_node status", if a dir_node has
some abnormal status (e.g. in space insufficient), we have to update all replicas'
referenced "dir_node status", it is implemented in `replica_stub::update_disks_status`.
This make the "dir_node status" updating path too long, and a bit of duplicate.

A new implemention is completed in #1473,
every replica has a reference of dir_node directly, so it would be easy to update replcia's
"dir_node status" by updating the referenced dir_node's status once.

Before the new implemention, this patch submit a minor refactor to remove
`replica_stub::update_disks_status` and related functions and variables. Also some unit
tests have been updated.
  • Loading branch information
acelyc111 authored May 26, 2023
1 parent d5f2259 commit 5404c40
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 114 deletions.
48 changes: 20 additions & 28 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,29 +110,23 @@ uint64_t dir_node::remove(const gpid &pid)
return iter->second.erase(pid);
}

bool dir_node::update_disk_stat(const bool update_disk_status)
void dir_node::update_disk_stat()
{
FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) { return false; });
dsn::utils::filesystem::disk_space_info info;
if (!dsn::utils::filesystem::get_disk_space_info(full_dir, info)) {
LOG_ERROR("update disk space failed: dir = {}", full_dir);
return false;
FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) { return; });

dsn::utils::filesystem::disk_space_info dsi;
if (!dsn::utils::filesystem::get_disk_space_info(full_dir, dsi)) {
// TODO(yingchun): it may encounter some IO errors when get_disk_space_info() failed, deal
// with it.
LOG_ERROR("get disk space info failed, dir = {}", full_dir);
return;
}
// update disk space info
disk_capacity_mb = info.capacity / 1024 / 1024;
disk_available_mb = info.available / 1024 / 1024;

disk_capacity_mb = dsi.capacity >> 20;
disk_available_mb = dsi.available >> 20;
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));

if (!update_disk_status) {
LOG_INFO("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio);
return false;
}
auto old_status = status;
auto new_status = disk_available_ratio < FLAGS_disk_min_available_space_ratio
? disk_status::SPACE_INSUFFICIENT
Expand All @@ -147,7 +141,6 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
disk_available_mb,
disk_available_ratio,
enum_to_string(status));
return (old_status != new_status);
}

fs_manager::fs_manager()
Expand Down Expand Up @@ -341,17 +334,16 @@ bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &
return true;
}

void fs_manager::update_disk_stat(bool check_status_changed)
void fs_manager::update_disk_stat()
{
zauto_write_lock l(_lock);
reset_disk_stat();
for (auto &dir_node : _dir_nodes) {
if (dir_node->update_disk_stat(check_status_changed)) {
_status_updated_dir_nodes.emplace_back(dir_node);
}
_total_capacity_mb += dir_node->disk_capacity_mb;
_total_available_mb += dir_node->disk_available_mb;
_min_available_ratio = std::min(dir_node->disk_available_ratio, _min_available_ratio);
_max_available_ratio = std::max(dir_node->disk_available_ratio, _max_available_ratio);
for (auto &dn : _dir_nodes) {
dn->update_disk_stat();
_total_capacity_mb += dn->disk_capacity_mb;
_total_available_mb += dn->disk_available_mb;
_min_available_ratio = std::min(dn->disk_available_ratio, _min_available_ratio);
_max_available_ratio = std::max(dn->disk_available_ratio, _max_available_ratio);
}
_total_available_ratio = static_cast<int>(
_total_capacity_mb == 0 ? 0 : std::round(_total_available_mb * 100.0 / _total_capacity_mb));
Expand Down
14 changes: 4 additions & 10 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct dir_node
std::string replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const;
bool has(const dsn::gpid &pid) const;
uint64_t remove(const dsn::gpid &pid);
bool update_disk_stat(const bool update_disk_status);
void update_disk_stat();
};

class fs_manager
Expand Down Expand Up @@ -111,7 +111,7 @@ class fs_manager
const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat(bool check_status_changed = true);
void update_disk_stat();

void add_new_dir_node(const std::string &data_dir, const std::string &tag);
const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
Expand All @@ -129,27 +129,21 @@ class fs_manager
_total_available_ratio = 0;
_min_available_ratio = 100;
_max_available_ratio = 0;
_status_updated_dir_nodes.clear();
}

dir_node *get_dir_node(const std::string &subdir) const;

// when visit the tag/storage of the _dir_nodes map, there's no need to protect by the lock.
// but when visit the holding_replicas, you must take care.
mutable zrwlock_nr _lock;

mutable zrwlock_nr _lock; // [ lock
int64_t _total_capacity_mb = 0;
int64_t _total_available_mb = 0;
int _total_available_ratio = 0;
int _min_available_ratio = 100;
int _max_available_ratio = 0;

std::vector<std::shared_ptr<dir_node>> _dir_nodes;

// Used for disk available space check
// disk status will be updated periodically, this vector record nodes whose disk_status changed
// in this round
std::vector<std::shared_ptr<dir_node>> _status_updated_dir_nodes;
// ] end of lock

perf_counter_wrapper _counter_total_capacity_mb;
perf_counter_wrapper _counter_total_available_mb;
Expand Down
18 changes: 6 additions & 12 deletions src/common/test/fs_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,24 @@ TEST(fs_manager, initialize)

TEST(fs_manager, dir_update_disk_status)
{
std::shared_ptr<dir_node> node = std::make_shared<dir_node>("tag", "path");
struct update_disk_status
{
bool update_status;
bool mock_insufficient;
disk_status::type old_disk_status;
disk_status::type new_disk_status;
bool expected_ret;
} tests[] = {
{false, false, disk_status::NORMAL, disk_status::NORMAL, false},
{false, true, disk_status::NORMAL, disk_status::NORMAL, false},
{true, false, disk_status::NORMAL, disk_status::NORMAL, false},
{true, false, disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL, true},
{true, true, disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT, true},
{true, true, disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT, false}};
} tests[] = {{false, disk_status::NORMAL, disk_status::NORMAL},
{false, disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL},
{true, disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT},
{true, disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT}};
for (const auto &test : tests) {
node->status = test.old_disk_status;
auto node = std::make_shared<dir_node>("tag", "path", 0, 0, 0, test.old_disk_status);
fail::setup();
if (test.mock_insufficient) {
fail::cfg("filesystem_get_disk_space_info", "return(insufficient)");
} else {
fail::cfg("filesystem_get_disk_space_info", "return(normal)");
}
ASSERT_EQ(test.expected_ret, node->update_disk_stat(test.update_status));
node->update_disk_stat();
ASSERT_EQ(test.new_disk_status, node->status);
fail::teardown();
}
Expand Down
20 changes: 0 additions & 20 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1989,7 +1989,6 @@ void replica_stub::on_disk_stat()
dsn::replication::disk_remove_useless_dirs(_fs_manager.get_dir_nodes(), report);
_fs_manager.update_disk_stat();
update_disk_holding_replicas();
update_disks_status();

_counter_replicas_error_replica_dir_count->set(report.error_replica_count);
_counter_replicas_garbage_replica_dir_count->set(report.garbage_replica_count);
Expand Down Expand Up @@ -3142,25 +3141,6 @@ void replica_stub::query_app_manual_compact_status(
}
}

void replica_stub::update_disks_status()
{
for (const auto &dir_node : _fs_manager._status_updated_dir_nodes) {
for (const auto &holding_replicas : dir_node->holding_replicas) {
const std::set<gpid> &pids = holding_replicas.second;
for (const auto &pid : pids) {
replica_ptr replica = get_replica(pid);
if (replica == nullptr) {
continue;
}
replica->set_disk_status(dir_node->status);
LOG_INFO("{} update disk_status to {}",
replica->name(),
enum_to_string(replica->get_disk_status()));
}
}
}
}

void replica_stub::update_config(const std::string &name)
{
// The new value has been validated and FLAGS_* has been updated, it's safety to use it
Expand Down
2 changes: 0 additions & 2 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
error_code error);
void update_disk_holding_replicas();

void update_disks_status();

void register_ctrl_command();

int get_app_id_from_replicas(std::string app_name)
Expand Down
20 changes: 10 additions & 10 deletions src/replica/test/replica_disk_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
#include "dsn.layer2_types.h"
#include "metadata_types.h"
#include "replica/disk_cleaner.h"
#include "replica/replica.h"
#include "replica/replica_stub.h"
#include "replica/test/mock_utils.h"
#include "replica_admin_types.h"
#include "replica_disk_test_base.h"
#include "runtime/rpc/rpc_holder.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -235,7 +237,6 @@ TEST_F(replica_disk_test, gc_disk_useless_dir)

TEST_F(replica_disk_test, disk_status_test)
{
int32_t node_index = 0;
struct disk_status_test
{
disk_status::type old_status;
Expand All @@ -244,19 +245,18 @@ TEST_F(replica_disk_test, disk_status_test)
{disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
auto dn = stub->get_fs_manager()->get_dir_nodes()[0];
for (const auto &test : tests) {
auto node = stub->get_fs_manager()->get_dir_nodes()[node_index];
mock_node_status(node_index, test.old_status, test.new_status);
update_disks_status();
for (auto &kv : node->holding_replicas) {
for (auto &pid : kv.second) {
bool flag;
ASSERT_EQ(replica_disk_space_insufficient(pid, flag), ERR_OK);
ASSERT_EQ(flag, test.new_status == disk_status::SPACE_INSUFFICIENT);
update_node_status(dn, test.old_status, test.new_status);
for (const auto &pids_of_app : dn->holding_replicas) {
for (const auto &pid : pids_of_app.second) {
replica_ptr rep = stub->get_replica(pid);
ASSERT_NE(nullptr, rep);
ASSERT_EQ(test.new_status, rep->disk_space_insufficient());
}
}
}
mock_node_status(node_index, disk_status::NORMAL, disk_status::NORMAL);
update_node_status(dn, disk_status::NORMAL, disk_status::NORMAL);
}

TEST_F(replica_disk_test, add_new_disk_test)
Expand Down
41 changes: 9 additions & 32 deletions src/replica/test/replica_disk_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ class replica_disk_test_base : public replica_test_base

~replica_disk_test_base() { fail::teardown(); }

void update_disk_replica() { stub->on_disk_stat(); }

void update_disks_status() { stub->update_disks_status(); }

void generate_mock_dir_node(const app_info &app,
const gpid pid,
const std::string &tag,
Expand All @@ -97,30 +93,20 @@ class replica_disk_test_base : public replica_test_base
}
}

void
mock_node_status(int32_t node_index, disk_status::type old_status, disk_status::type new_status)
void update_node_status(const std::shared_ptr<dir_node> &dn,
disk_status::type old_status,
disk_status::type new_status)
{
auto node = stub->_fs_manager.get_dir_nodes()[node_index];
for (const auto &kv : node->holding_replicas) {
for (const auto &pid : kv.second) {
update_replica_disk_status(pid, old_status);
for (const auto &pids_of_app : dn->holding_replicas) {
for (const auto &pid : pids_of_app.second) {
replica_ptr rep = stub->get_replica(pid);
ASSERT_NE(nullptr, rep);
rep->set_disk_status(new_status);
}
}
stub->_fs_manager._status_updated_dir_nodes.clear();
if (old_status != new_status) {
node->status = new_status;
stub->_fs_manager._status_updated_dir_nodes.emplace_back(node);
}
}

error_code replica_disk_space_insufficient(const gpid &pid, bool &flag)
{
replica_ptr replica = stub->get_replica(pid);
if (replica == nullptr) {
return ERR_OBJECT_NOT_FOUND;
dn->status = new_status;
}
flag = replica->disk_space_insufficient();
return ERR_OK;
}

void prepare_before_add_new_disk_test(const std::string &create_dir,
Expand Down Expand Up @@ -223,15 +209,6 @@ class replica_disk_test_base : public replica_test_base
stub->_fs_manager._dir_nodes.emplace_back(node_disk);
}
}

void update_replica_disk_status(const gpid &pid, const disk_status::type status)
{
replica_ptr replica = stub->get_replica(pid);
if (replica == nullptr) {
return;
}
replica->set_disk_status(status);
}
};

} // namespace replication
Expand Down

0 comments on commit 5404c40

Please sign in to comment.