Skip to content

Commit

Permalink
refactor: improve the single-responsibility of class fs_manager (2/n) (
Browse files Browse the repository at this point in the history
…apache#1522)

apache#1383

This patch moves some functions to fs_manager which are more reasonable to be
responsibilities of class fs_manager rather than those of other classes, includeing:
- remove `fs_manager::for_each_dir_node`
- minimize some locks
- rename `fs_manager::is_dir_node_available` to `fs_manager::is_dir_node_exist`
- move `get_disk_infos` code to class `fs_manager` and encapsulate it as a function
- move `validate_migrate_op` code to class `fs_manager` and encapsulate it as a function
- move `disk_status_to_error_code` from replica_2pc.cpp to class `fs_manager`
  • Loading branch information
acelyc111 authored Jun 8, 2023
1 parent 4dcbb1e commit d216696
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 122 deletions.
146 changes: 126 additions & 20 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <algorithm>
#include <cmath>
#include <cstdint>
#include <iosfwd>
#include <utility>

Expand All @@ -44,8 +45,8 @@
#include "fmt/core.h"
#include "fmt/ostream.h"
#include "perf_counter/perf_counter.h"
#include "replica_admin_types.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
Expand All @@ -69,6 +70,19 @@ DSN_DEFINE_bool(replication,
true,
"true means ignore broken data disk when initialize");

error_code disk_status_to_error_code(disk_status::type ds)
{
switch (ds) {
case disk_status::SPACE_INSUFFICIENT:
return dsn::ERR_DISK_INSUFFICIENT;
case disk_status::IO_ERROR:
return dsn::ERR_DISK_IO_ERROR;
default:
CHECK_EQ(disk_status::NORMAL, ds);
return dsn::ERR_OK;
}
}

uint64_t dir_node::replicas_count() const
{
uint64_t sum = 0;
Expand Down Expand Up @@ -339,16 +353,6 @@ void fs_manager::remove_replica(const gpid &pid)
}
}

bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &func) const
{
zauto_read_lock l(_lock);
for (auto &n : _dir_nodes) {
if (!func(*n))
return false;
}
return true;
}

void fs_manager::update_disk_stat()
{
zauto_write_lock l(_lock);
Expand Down Expand Up @@ -388,21 +392,25 @@ void fs_manager::update_disk_stat()

void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
{
zauto_write_lock l(_lock);
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
dir_node *n = new dir_node(tag, norm_path);
_dir_nodes.emplace_back(n);
LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
auto dn = std::make_shared<dir_node>(tag, norm_path);

{
zauto_write_lock l(_lock);
_dir_nodes.emplace_back(dn);
}
LOG_INFO("add new data dir({}) and mark as tag({})", norm_path, tag);
}

bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const
bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const
{
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);

zauto_read_lock l(_lock);
for (const auto &dir_node : _dir_nodes) {
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
if (dir_node->full_dir == norm_path || dir_node->tag == tag) {
for (const auto &dn : _dir_nodes) {
if (dn->full_dir == norm_path || dn->tag == tag) {
return true;
}
}
Expand Down Expand Up @@ -497,5 +505,103 @@ dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
return child_dn;
}

std::vector<disk_info> fs_manager::get_disk_infos(int app_id) const
{
std::vector<disk_info> disk_infos;
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
disk_info di;
// Query all app info if 'app_id' is 0, which is not a valid app id.
if (app_id == 0) {
di.holding_primary_replicas = dn->holding_primary_replicas;
di.holding_secondary_replicas = dn->holding_secondary_replicas;
} else {
const auto &primary_replicas = dn->holding_primary_replicas.find(app_id);
if (primary_replicas != dn->holding_primary_replicas.end()) {
di.holding_primary_replicas[app_id] = primary_replicas->second;
}

const auto &secondary_replicas = dn->holding_secondary_replicas.find(app_id);
if (secondary_replicas != dn->holding_secondary_replicas.end()) {
di.holding_secondary_replicas[app_id] = secondary_replicas->second;
}
}
di.tag = dn->tag;
di.full_dir = dn->full_dir;
di.disk_capacity_mb = dn->disk_capacity_mb;
di.disk_available_mb = dn->disk_available_mb;

disk_infos.emplace_back(std::move(di));
}

return disk_infos;
}

error_code fs_manager::validate_migrate_op(gpid pid,
const std::string &origin_disk,
const std::string &target_disk,
std::string &err_msg) const
{
bool origin_disk_exist = false;
bool target_disk_exist = false;
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Check if the origin directory is valid.
if (dn->tag == origin_disk) {
CHECK_FALSE(origin_disk_exist);
if (!dn->has(pid)) {
err_msg = fmt::format(
"replica({}) doesn't exist on the origin disk({})", pid, origin_disk);
return ERR_OBJECT_NOT_FOUND;
}

// It's OK to migrate a replica from a dir_node which is NORMAL or even
// SPACE_INSUFFICIENT, but not allowed when it's IO_ERROR.
if (dn->status == disk_status::IO_ERROR) {
err_msg = fmt::format(
"replica({}) exists on an IO-Error origin disk({})", pid, origin_disk);
return ERR_DISK_IO_ERROR;
}

origin_disk_exist = true;
}

// Check if the target directory is valid.
if (dn->tag == target_disk) {
CHECK_FALSE(target_disk_exist);
if (dn->has(pid)) {
err_msg =
fmt::format("replica({}) already exists on target disk({})", pid, target_disk);
return ERR_PATH_ALREADY_EXIST;
}

// It's not allowed to migrate a replica to a dir_node which is either
// SPACE_INSUFFICIENT or IO_ERROR.
if (dn->status == disk_status::SPACE_INSUFFICIENT ||
dn->status == disk_status::IO_ERROR) {
err_msg = fmt::format("replica({}) target disk({}) is {}",
pid,
origin_disk,
enum_to_string(dn->status));
return disk_status_to_error_code(dn->status);
}

target_disk_exist = true;
}
}

if (!origin_disk_exist) {
err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk);
return ERR_OBJECT_NOT_FOUND;
}

if (!target_disk_exist) {
err_msg = fmt::format("target disk({}) doesn't exist", target_disk);
return ERR_OBJECT_NOT_FOUND;
}

return ERR_OK;
}

} // namespace replication
} // namespace dsn
13 changes: 10 additions & 3 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <gtest/gtest_prod.h>
#include <stdint.h>
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <set>
Expand All @@ -30,6 +29,7 @@
#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/string_view.h"
#include "utils/zlocks.h"
Expand All @@ -38,9 +38,12 @@ namespace dsn {
class gpid;

namespace replication {
class disk_info;

DSN_DECLARE_int32(disk_min_available_space_ratio);

error_code disk_status_to_error_code(disk_status::type ds);

struct dir_node
{
public:
Expand Down Expand Up @@ -116,16 +119,20 @@ class fs_manager
gpid child_pid,
const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat();

void add_new_dir_node(const std::string &data_dir, const std::string &tag);
bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
{
zauto_read_lock l(_lock);
return _dir_nodes;
}
bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
error_code validate_migrate_op(gpid pid,
const std::string &origin_disk,
const std::string &target_disk,
std::string &err_msg) const;
std::vector<disk_info> get_disk_infos(int app_id) const;

private:
void reset_disk_stat()
Expand Down
15 changes: 0 additions & 15 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,6 @@ DSN_DEFINE_uint64(
DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
DSN_DECLARE_int32(staleness_for_commit);

namespace {
error_code disk_status_to_error_code(disk_status::type ds)
{
switch (ds) {
case disk_status::SPACE_INSUFFICIENT:
return dsn::ERR_DISK_INSUFFICIENT;
case disk_status::IO_ERROR:
return dsn::ERR_DISK_IO_ERROR;
default:
CHECK_EQ(disk_status::NORMAL, ds);
return dsn::ERR_OK;
}
}
} // anonymous namespace

void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();
Expand Down
60 changes: 6 additions & 54 deletions src/replica/replica_disk_migrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

#include <boost/algorithm/string/replace.hpp>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <iosfwd>
#include <memory>
#include <vector>

#include "common/fs_manager.h"
#include "common/gpid.h"
Expand Down Expand Up @@ -126,56 +122,12 @@ bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
return false;
}

bool valid_origin_disk = false;
bool valid_target_disk = false;
// _dir_nodes: std::vector<std::shared_ptr<dir_node>>
// TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR.
for (const auto &dir_node : _replica->get_replica_stub()->_fs_manager._dir_nodes) {
if (dir_node->tag == req.origin_disk) {
valid_origin_disk = true;
if (!dir_node->has(req.pid)) {
std::string err_msg =
fmt::format("Invalid replica(replica({}) doesn't exist on origin disk({}))",
req.pid,
req.origin_disk);
LOG_ERROR_PREFIX(
"received replica disk migrate request(origin={}, target={}), err = {}",
req.origin_disk,
req.target_disk,
err_msg);
resp.err = ERR_OBJECT_NOT_FOUND;
resp.__set_hint(err_msg);
return false;
}
}

if (dir_node->tag == req.target_disk) {
valid_target_disk = true;
if (dir_node->has(get_gpid())) {
std::string err_msg =
fmt::format("Invalid replica(replica({}) has existed on target disk({}))",
req.pid,
req.target_disk);
LOG_ERROR_PREFIX(
"received replica disk migrate request(origin={}, target={}), err = {}",
req.origin_disk,
req.target_disk,
err_msg);
resp.err = ERR_PATH_ALREADY_EXIST;
resp.__set_hint(err_msg);
return false;
}
}
}

if (!valid_origin_disk || !valid_target_disk) {
std::string invalid_disk_tag = !valid_origin_disk ? req.origin_disk : req.target_disk;
std::string err_msg = fmt::format("Invalid disk tag({} doesn't exist)", invalid_disk_tag);
LOG_ERROR_PREFIX("received replica disk migrate request(origin={}, target={}), err = {}",
req.origin_disk,
req.target_disk,
err_msg);
resp.err = ERR_OBJECT_NOT_FOUND;
std::string err_msg;
auto ec = _replica->get_replica_stub()->_fs_manager.validate_migrate_op(
req.pid, req.origin_disk, req.target_disk, err_msg);
if (ec != ERR_OK) {
LOG_ERROR_PREFIX(err_msg);
resp.err = ec;
resp.__set_hint(err_msg);
return false;
}
Expand Down
37 changes: 7 additions & 30 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1105,34 +1105,10 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
}
}

for (const auto &dir_node : _fs_manager._dir_nodes) {
disk_info info;
// app_name empty means query all app replica_count
if (req.app_name.empty()) {
info.holding_primary_replicas = dir_node->holding_primary_replicas;
info.holding_secondary_replicas = dir_node->holding_secondary_replicas;
} else {
const auto &primary_iter = dir_node->holding_primary_replicas.find(app_id);
if (primary_iter != dir_node->holding_primary_replicas.end()) {
info.holding_primary_replicas[app_id] = primary_iter->second;
}

const auto &secondary_iter = dir_node->holding_secondary_replicas.find(app_id);
if (secondary_iter != dir_node->holding_secondary_replicas.end()) {
info.holding_secondary_replicas[app_id] = secondary_iter->second;
}
}
info.tag = dir_node->tag;
info.full_dir = dir_node->full_dir;
info.disk_capacity_mb = dir_node->disk_capacity_mb;
info.disk_available_mb = dir_node->disk_available_mb;

resp.disk_infos.emplace_back(info);
}

resp.total_capacity_mb = _fs_manager._total_capacity_mb;
resp.total_available_mb = _fs_manager._total_available_mb;

resp.disk_infos = _fs_manager.get_disk_infos(app_id);
// Get the statistics from fs_manager's metrics, they are thread-safe.
resp.total_capacity_mb = _fs_manager._counter_total_capacity_mb->get_integer_value();
resp.total_available_mb = _fs_manager._counter_total_available_mb->get_integer_value();
resp.err = ERR_OK;
}

Expand Down Expand Up @@ -1203,11 +1179,12 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
}

for (auto i = 0; i < data_dir_tags.size(); ++i) {
// TODO(yingchun): move the following code to fs_manager.
auto dir = data_dirs[i];
if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) {
if (_fs_manager.is_dir_node_exist(dir, data_dir_tags[i])) {
resp.err = ERR_NODE_ALREADY_EXIST;
resp.__set_err_hint(
fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i]));
fmt::format("data_dir({}) tag({}) already exist", dir, data_dir_tags[i]));
return;
}

Expand Down
Loading

0 comments on commit d216696

Please sign in to comment.