Skip to content

Commit

Permalink
refactor: fs manager
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed May 16, 2023
1 parent ee03ffe commit 70ed3cf
Show file tree
Hide file tree
Showing 42 changed files with 812 additions and 929 deletions.
3 changes: 2 additions & 1 deletion idl/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ enum split_status
enum disk_status
{
NORMAL = 0,
SPACE_INSUFFICIENT
SPACE_INSUFFICIENT,
IO_ERROR
}

enum manual_compaction_status
Expand Down
3 changes: 2 additions & 1 deletion src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, int
{
auto fd = ::open(file_name, flag, pmode);
if (fd == DSN_INVALID_FILE_HANDLE) {
LOG_ERROR("create file failed, err = {}", utils::safe_strerror(errno));
LOG_ERROR(
"create file failed, file = {}, err = {}", file_name, utils::safe_strerror(errno));
}
return linux_fd_t(fd);
}
Expand Down
500 changes: 344 additions & 156 deletions src/common/fs_manager.cpp

Large diffs are not rendered by default.

76 changes: 42 additions & 34 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ DSN_DECLARE_int32(disk_min_available_space_ratio);
struct dir_node
{
public:
std::string tag;
std::string full_dir;
const std::string tag;
const std::string full_dir;
int64_t disk_capacity_mb;
int64_t disk_available_mb;
int disk_available_ratio;
disk_status::type status;
std::atomic<disk_status::type> status;
std::map<app_id, std::set<gpid>> holding_replicas;
std::map<app_id, std::set<gpid>> holding_primary_replicas;
std::map<app_id, std::set<gpid>> holding_secondary_replicas;
Expand All @@ -69,41 +69,54 @@ struct dir_node
status(status_)
{
}
unsigned replicas_count(app_id id) const;
unsigned replicas_count() const;
// All functions are not thread-safe. However, they are only used in fs_manager
// and protected by the lock in fs_manager.
uint64_t replicas_count(app_id id) const;
uint64_t replicas_count() const;
std::string replica_dir(const char *app_type, const dsn::gpid &pid) const;
bool has(const dsn::gpid &pid) const;
unsigned remove(const dsn::gpid &pid);
bool update_disk_stat(const bool update_disk_status);
uint64_t remove(const dsn::gpid &pid);
void update_disk_stat();
};

class fs_manager
{
public:
fs_manager(bool for_test);
~fs_manager() {}

// this should be called before open/load any replicas
dsn::error_code initialize(const replication_options &opts);
dsn::error_code initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &tags,
bool for_test);

dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
void allocate_dir(const dsn::gpid &pid,
const std::string &type,
/*out*/ std::string &dir);
fs_manager();

// Should be called before open/load any replicas.
// NOTE: 'data_dirs' and 'tags' must have the same size and in the same order.
void initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags);
dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
void specify_dir_for_new_replica_for_test(dir_node *specified_dn, const dsn::gpid &pid) const;
void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
dir_node *get_replica_dir(const char *app_type, gpid id, bool create_new);
// during partition split, we should gurantee child replica and parent replica share the
// same data dir
dir_node *get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat(bool check_status_changed = true);
void update_disk_stat();

void add_new_dir_node(const std::string &data_dir, const std::string &tag);
bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::string> &get_available_data_dirs() const
bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
{
zauto_read_lock l(_lock);
return _available_data_dirs;
return _dir_nodes;
}
enum class disk_migrate_validation
{
kValid,
kReplicaNotFound,
kReplicaExist,
kDiskNotFound
};
disk_migrate_validation validate_migrate_task(gpid pid,
const std::string &origin_disk,
const std::string &target_disk,
std::string &err_msg) const;
std::vector<disk_info> get_disk_infos(int app_id) const;

private:
void reset_disk_stat()
Expand All @@ -113,28 +126,23 @@ class fs_manager
_total_available_ratio = 0;
_min_available_ratio = 100;
_max_available_ratio = 0;
_status_updated_dir_nodes.clear();
}

dir_node *get_dir_node(const std::string &subdir) const;

// when visit the tag/storage of the _dir_nodes map, there's no need to protect by the lock.
// but when visit the holding_replicas, you must take care.
mutable zrwlock_nr _lock;

mutable zrwlock_nr _lock; // [ lock
int64_t _total_capacity_mb = 0;
int64_t _total_available_mb = 0;
int _total_available_ratio = 0;
int _min_available_ratio = 100;
int _max_available_ratio = 0;

// Once dir_node has been added to '_dir_nodes', it will not be removed, it will be marked
// as non-NORMAL status if it is not available.
std::vector<std::shared_ptr<dir_node>> _dir_nodes;
std::vector<std::string> _available_data_dirs;

// Used for disk available space check
// disk status will be updated periodically, this vector record nodes whose disk_status changed
// in this round
std::vector<std::shared_ptr<dir_node>> _status_updated_dir_nodes;
// ] end of lock

perf_counter_wrapper _counter_total_capacity_mb;
perf_counter_wrapper _counter_total_available_mb;
Expand Down
1 change: 1 addition & 0 deletions src/common/replication_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ ENUM_END2(replication::disk_migration_status::type, disk_migration_status)
ENUM_BEGIN2(replication::disk_status::type, disk_status, replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::NORMAL)
ENUM_REG(replication::disk_status::SPACE_INSUFFICIENT)
ENUM_REG(replication::disk_status::IO_ERROR)
ENUM_END2(replication::disk_status::type, disk_status)

ENUM_BEGIN2(replication::manual_compaction_status::type,
Expand Down
18 changes: 6 additions & 12 deletions src/common/test/fs_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,24 @@ namespace replication {

TEST(fs_manager, dir_update_disk_status)
{
std::shared_ptr<dir_node> node = std::make_shared<dir_node>("tag", "path");
struct update_disk_status
{
bool update_status;
bool mock_insufficient;
disk_status::type old_disk_status;
disk_status::type new_disk_status;
bool expected_ret;
} tests[] = {
{false, false, disk_status::NORMAL, disk_status::NORMAL, false},
{false, true, disk_status::NORMAL, disk_status::NORMAL, false},
{true, false, disk_status::NORMAL, disk_status::NORMAL, false},
{true, false, disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL, true},
{true, true, disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT, true},
{true, true, disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT, false}};
} tests[] = {{false, disk_status::NORMAL, disk_status::NORMAL},
{false, disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL},
{true, disk_status::NORMAL, disk_status::SPACE_INSUFFICIENT},
{true, disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT}};
for (const auto &test : tests) {
node->status = test.old_disk_status;
auto node = std::make_shared<dir_node>("tag", "path", 0, 0, 0, test.old_disk_status);
fail::setup();
if (test.mock_insufficient) {
fail::cfg("filesystem_get_disk_space_info", "return(insufficient)");
} else {
fail::cfg("filesystem_get_disk_space_info", "return(normal)");
}
ASSERT_EQ(test.expected_ret, node->update_disk_stat(test.update_status));
node->update_disk_stat();
ASSERT_EQ(test.new_disk_status, node->status);
fail::teardown();
}
Expand Down
100 changes: 0 additions & 100 deletions src/meta/test/balancer_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,106 +113,6 @@ static void check_cure(app_mapper &apps, node_mapper &nodes, ::dsn::partition_co
ns->put_partition(pc.pid, true);
}

// static void verbose_nodes(const node_mapper& nodes)
//{
// std::cout << "------------" << std::endl;
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("node: %s\ntotal_primaries: %d, total_secondaries: %d\n", n.first.to_string(),
// ns.primary_count(), ns.partition_count());
// for (int i=1; i<=2; ++i)
// {
// printf("app %d primaries: %d, app %d partitions: %d\n", i, ns.primary_count(i), i,
// ns.partition_count(i));
// }
// }
//}
//
// static void verbose_app_node(const node_mapper& nodes)
//{
// printf("Total_Pri: ");
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("%*d", 3, ns.primary_count());
// }
// printf("\nTotal_Sec: ");
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("%*d", 3, ns.secondary_count());
// }
// printf("\nApp01_Pri: ");
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("%*d", 3, ns.primary_count(1));
// }
// printf("\nApp01_Sec: ");
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("%*d", 3, ns.secondary_count(1));
// }
// printf("\nApp02_Pri: ");
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("%*d", 3, ns.primary_count(2));
// }
// printf("\nApp02_Sec: ");
// for (const auto& n: nodes)
// {
// const node_state& ns = n.second;
// printf("%*d", 3, ns.secondary_count(2));
// }
// printf("\n");
//}

// static void verbose_app(const std::shared_ptr<app_state>& app)
//{
// std::cout << app->app_name << " " << app->app_id << " " << app->partition_count << std::endl;
// for (int i=0; i<app->partition_count; ++i)
// {
// const partition_configuration& pc = app->partitions[i];
// std::cout << pc.primary.to_string();
// for (int j=0; j<pc.secondaries.size(); ++j)
// {
// std::cout << " " << pc.secondaries[j].to_string();
// }
// std::cout << std::endl;
// }
//}
// static void print_node_fs_manager(const app_mapper &apps,
// const node_mapper &nodes,
// const nodes_fs_manager &manager)
//{
// int apps_count = apps.size();
// for (const auto &kv : nodes) {
// const node_state &ns = kv.second;
// printf("%s: %d primaries, %d partitions\n",
// ns.addr().to_string(),
// ns.primary_count(),
// ns.partition_count());
// printf("%8s", "tag");
// for (int i = 1; i <= apps_count; ++i) {
// std::string app = std::string("app") + std::to_string(i);
// printf("%8s", app.c_str());
// }
// printf("\n");
// const fs_manager &m = manager.find(ns.addr())->second;
// m.for_each_dir_node([apps_count](const dir_node &dn) {
// printf("%8s", dn.tag.c_str());
// for (int i = 1; i <= apps_count; ++i) {
// printf("%8u", dn.replicas_count(i));
// }
// printf("%8u\n", dn.replicas_count());
// return true;
// });
// }
//}

void meta_service_test_app::balancer_validator()
{
std::vector<dsn::rpc_address> node_list;
Expand Down
22 changes: 12 additions & 10 deletions src/meta/test/misc/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ void generate_node_fs_manager(const app_mapper &apps,
for (const auto &kv : nodes) {
const node_state &ns = kv.second;
if (nfm.find(ns.addr()) == nfm.end()) {
nfm.emplace(ns.addr(), std::make_shared<fs_manager>(true));
nfm.emplace(ns.addr(), std::make_shared<fs_manager>());
}
fs_manager &manager = *(nfm.find(ns.addr())->second);
manager.initialize(data_dirs, tags, true);
manager.initialize(data_dirs, tags);
ns.for_each_partition([&](const dsn::gpid &pid) {
const config_context &cc = *get_config_context(apps, pid);
snprintf(pid_dir,
Expand Down Expand Up @@ -240,19 +240,21 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo
std::string dir;
replica_info ri;
switch (act.type) {
case config_type::CT_ASSIGN_PRIMARY:
target_manager->allocate_dir(pid, "test", dir);
CHECK_EQ(dsn::ERR_OK, target_manager->get_disk_tag(dir, ri.disk_tag));
case config_type::CT_ASSIGN_PRIMARY: {
auto selected = target_manager->find_best_dir_for_new_replica(pid);
CHECK_NOTNULL(selected, "");
selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(act.target, ri);
break;

}
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
node_manager->allocate_dir(pid, "test", dir);
CHECK_EQ(dsn::ERR_OK, node_manager->get_disk_tag(dir, ri.disk_tag));
case config_type::CT_ADD_SECONDARY_FOR_LB: {
auto selected = node_manager->find_best_dir_for_new_replica(pid);
CHECK_NOTNULL(selected, "");
selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(act.node, ri);
break;

}
case config_type::CT_DOWNGRADE_TO_SECONDARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
break;
Expand Down
13 changes: 9 additions & 4 deletions src/replica/disk_cleaner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@ const std::string kFolderSuffixBak = ".bak";
const std::string kFolderSuffixOri = ".ori";
const std::string kFolderSuffixTmp = ".tmp";

error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
error_s disk_remove_useless_dirs(const std::vector<std::shared_ptr<dir_node>> &dir_nodes,
/*output*/ disk_cleaning_report &report)
{
std::vector<std::string> sub_list;
for (auto &dir : data_dirs) {
for (const auto &dn : dir_nodes) {
// It's allowed to clear up the directory when it's SPACE_INSUFFICIENT, but not allowed when
// it's IO_ERROR.
if (dn->status == disk_status::IO_ERROR) {
continue;
}
std::vector<std::string> tmp_list;
if (!dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false)) {
LOG_WARNING("gc_disk: failed to get subdirectories in {}", dir);
if (!dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false)) {
LOG_WARNING("gc_disk: failed to get subdirectories in {}", dn->full_dir);
return error_s::make(ERR_OBJECT_NOT_FOUND, "failed to get subdirectories");
}
sub_list.insert(sub_list.end(), tmp_list.begin(), tmp_list.end());
Expand Down
3 changes: 2 additions & 1 deletion src/replica/disk_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <vector>

#include "common/fs_manager.h"
#include "utils/errors.h"
#include "utils/flags.h"

Expand Down Expand Up @@ -49,7 +50,7 @@ struct disk_cleaning_report
};

// Removes the useless data from data directories.
extern error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
extern error_s disk_remove_useless_dirs(const std::vector<std::shared_ptr<dir_node>> &dir_nodes,
/*output*/ disk_cleaning_report &report);

inline bool is_data_dir_removable(const std::string &dir)
Expand Down
2 changes: 1 addition & 1 deletion src/replica/duplication/replica_follower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node,
request->source_disk_tag = remote_disk;
request->source_dir = remote_dir;
request->files = file_list;
request->dest_disk_tag = _replica->get_replica_disk_tag();
request->dest_disk_tag = _replica->get_dir_node()->tag;
request->dest_dir = dest_dir;
request->overwrite = true;
request->high_priority = false;
Expand Down
Loading

0 comments on commit 70ed3cf

Please sign in to comment.