Skip to content

Commit

Permalink
refactor: fs manager
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 8, 2023
1 parent 4dcbb1e commit 7b89eaf
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 118 deletions.
1 change: 1 addition & 0 deletions go-client/idl/base/error_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const (
ERR_PARENT_PARTITION_MISUSED
ERR_CHILD_NOT_READY
ERR_DISK_INSUFFICIENT
ERR_DISK_IO_ERROR
)

func (e DsnErrCode) Error() string {
Expand Down
134 changes: 114 additions & 20 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <algorithm>
#include <cmath>
#include <cstdint>
#include <iosfwd>
#include <utility>

Expand All @@ -44,8 +45,8 @@
#include "fmt/core.h"
#include "fmt/ostream.h"
#include "perf_counter/perf_counter.h"
#include "replica_admin_types.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -339,16 +340,6 @@ void fs_manager::remove_replica(const gpid &pid)
}
}

bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &func) const
{
zauto_read_lock l(_lock);
for (auto &n : _dir_nodes) {
if (!func(*n))
return false;
}
return true;
}

void fs_manager::update_disk_stat()
{
zauto_write_lock l(_lock);
Expand Down Expand Up @@ -388,21 +379,25 @@ void fs_manager::update_disk_stat()

void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
{
zauto_write_lock l(_lock);
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
dir_node *n = new dir_node(tag, norm_path);
_dir_nodes.emplace_back(n);
LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);

{
zauto_write_lock l(_lock);
auto dn = std::make_shared<dir_node>(tag, norm_path);
_dir_nodes.emplace_back(dn);
}
LOG_INFO("mark data dir({}) as tag({})", norm_path, tag);
}

bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const
bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const
{
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);

zauto_read_lock l(_lock);
for (const auto &dir_node : _dir_nodes) {
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
if (dir_node->full_dir == norm_path || dir_node->tag == tag) {
for (const auto &dn : _dir_nodes) {
if (dn->full_dir == norm_path || dn->tag == tag) {
return true;
}
}
Expand Down Expand Up @@ -497,5 +492,104 @@ dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
return child_dn;
}

std::vector<disk_info> fs_manager::get_disk_infos(int app_id) const
{
std::vector<disk_info> disk_infos;
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
disk_info di;
// Query all app info if 'app_id' is 0, which is not a valid app id.
if (app_id == 0) {
di.holding_primary_replicas = dn->holding_primary_replicas;
di.holding_secondary_replicas = dn->holding_secondary_replicas;
} else {
const auto &primary_iter = dn->holding_primary_replicas.find(app_id);
if (primary_iter != dn->holding_primary_replicas.end()) {
di.holding_primary_replicas[app_id] = primary_iter->second;
}

const auto &secondary_iter = dn->holding_secondary_replicas.find(app_id);
if (secondary_iter != dn->holding_secondary_replicas.end()) {
di.holding_secondary_replicas[app_id] = secondary_iter->second;
}
}
di.tag = dn->tag;
di.full_dir = dn->full_dir;
di.disk_capacity_mb = dn->disk_capacity_mb;
di.disk_available_mb = dn->disk_available_mb;

disk_infos.emplace_back(std::move(di));
}

return disk_infos;
}

fs_manager::disk_migrate_validation
fs_manager::validate_migrate_task(gpid pid,
const std::string &origin_disk,
const std::string &target_disk,
std::string &err_msg) const
{
bool origin_disk_exist = false;
bool target_disk_exist = false;
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Check if the origin directory is valid.
if (dn->tag == origin_disk) {
CHECK_FALSE(origin_disk_exist);
if (!dn->has(pid)) {
err_msg = fmt::format(
"replica({}) doesn't exist on the origin disk({})", pid, origin_disk);
return disk_migrate_validation::kReplicaNotFound;
}

// It's OK to migrate a replica from a dir_node which is NORMAL or even
// SPACE_INSUFFICIENT, but not allowed when it's IO_ERROR.
if (dn->status == disk_status::IO_ERROR) {
err_msg = fmt::format(
"replica({}) exists on an IO-Error origin disk({})", pid, origin_disk);
return disk_migrate_validation::kDiskIOError;
}

origin_disk_exist = true;
}

// Check if the target directory is valid.
if (dn->tag == target_disk) {
CHECK_FALSE(target_disk_exist);
if (dn->has(pid)) {
err_msg =
fmt::format("replica({}) already exists on target disk({})", pid, target_disk);
return disk_migrate_validation::kReplicaExist;
}

// It's not allowed to migrate a replica to a dir_node which is either
// SPACE_INSUFFICIENT or IO_ERROR.
if (dn->status == disk_status::SPACE_INSUFFICIENT ||
dn->status == disk_status::IO_ERROR) {
err_msg = fmt::format("replica({}) target disk({}) is {}",
pid,
origin_disk,
enum_to_string(dn->status));
return disk_migrate_validation::kDiskIOError;
}

target_disk_exist = true;
}
}

if (!origin_disk_exist) {
err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk);
return disk_migrate_validation::kDiskNotFound;
}

if (!target_disk_exist) {
err_msg = fmt::format("target disk({}) doesn't exist", target_disk);
return disk_migrate_validation::kDiskNotFound;
}

return disk_migrate_validation::kValid;
}

} // namespace replication
} // namespace dsn
17 changes: 15 additions & 2 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace dsn {
class gpid;

namespace replication {
class disk_info;

DSN_DECLARE_int32(disk_min_available_space_ratio);

Expand Down Expand Up @@ -116,16 +117,28 @@ class fs_manager
gpid child_pid,
const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat();

void add_new_dir_node(const std::string &data_dir, const std::string &tag);
bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
{
zauto_read_lock l(_lock);
return _dir_nodes;
}
bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
enum class disk_migrate_validation
{
kValid,
kReplicaNotFound,
kReplicaExist,
kDiskNotFound,
kDiskIOError
};
disk_migrate_validation validate_migrate_task(gpid pid,
const std::string &origin_disk,
const std::string &target_disk,
std::string &err_msg) const;
std::vector<disk_info> get_disk_infos(int app_id) const;

private:
void reset_disk_stat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class load_from_private_log_test : public duplication_test_base

mutation_log_ptr create_private_log(gpid id) { return create_private_log(1, id); }

mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 1))
mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 0))
{
std::map<gpid, decree> replay_condition;
replay_condition[id] = 0; // duplicating
Expand Down
7 changes: 5 additions & 2 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,14 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
auto storage_error = _app->on_request(request);
if (dsn_unlikely(storage_error != ERR_OK)) {
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
// TODO(yingchun): Now only kCorruption and kIOError is dealt, consider to deal with
// more storage engine errors.
case rocksdb::Status::kCorruption:
handle_local_failure(ERR_RDB_CORRUPTION);
break;
case rocksdb::Status::kIOError:
handle_local_failure(ERR_DISK_IO_ERROR);
break;
default:
LOG_ERROR_PREFIX("client read encountered an unhandled error: {}", storage_error);
}
Expand Down
77 changes: 21 additions & 56 deletions src/replica/replica_disk_migrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

#include <boost/algorithm/string/replace.hpp>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <iosfwd>
#include <memory>
#include <vector>

#include "common/fs_manager.h"
#include "common/gpid.h"
Expand Down Expand Up @@ -126,62 +122,31 @@ bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
return false;
}

bool valid_origin_disk = false;
bool valid_target_disk = false;
// _dir_nodes: std::vector<std::shared_ptr<dir_node>>
// TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR.
for (const auto &dir_node : _replica->get_replica_stub()->_fs_manager._dir_nodes) {
if (dir_node->tag == req.origin_disk) {
valid_origin_disk = true;
if (!dir_node->has(req.pid)) {
std::string err_msg =
fmt::format("Invalid replica(replica({}) doesn't exist on origin disk({}))",
req.pid,
req.origin_disk);
LOG_ERROR_PREFIX(
"received replica disk migrate request(origin={}, target={}), err = {}",
req.origin_disk,
req.target_disk,
err_msg);
resp.err = ERR_OBJECT_NOT_FOUND;
resp.__set_hint(err_msg);
return false;
}
}

if (dir_node->tag == req.target_disk) {
valid_target_disk = true;
if (dir_node->has(get_gpid())) {
std::string err_msg =
fmt::format("Invalid replica(replica({}) has existed on target disk({}))",
req.pid,
req.target_disk);
LOG_ERROR_PREFIX(
"received replica disk migrate request(origin={}, target={}), err = {}",
req.origin_disk,
req.target_disk,
err_msg);
resp.err = ERR_PATH_ALREADY_EXIST;
resp.__set_hint(err_msg);
return false;
}
}
std::string err_msg;
fs_manager::disk_migrate_validation type =
_replica->get_replica_stub()->_fs_manager.validate_migrate_task(
req.pid, req.origin_disk, req.target_disk, err_msg);
if (type != fs_manager::disk_migrate_validation::kValid) {
LOG_ERROR_PREFIX(err_msg);
resp.__set_hint(err_msg);
}

if (!valid_origin_disk || !valid_target_disk) {
std::string invalid_disk_tag = !valid_origin_disk ? req.origin_disk : req.target_disk;
std::string err_msg = fmt::format("Invalid disk tag({} doesn't exist)", invalid_disk_tag);
LOG_ERROR_PREFIX("received replica disk migrate request(origin={}, target={}), err = {}",
req.origin_disk,
req.target_disk,
err_msg);
switch (type) {
case fs_manager::disk_migrate_validation::kReplicaNotFound:
resp.err = ERR_OBJECT_NOT_FOUND;
return false;
case fs_manager::disk_migrate_validation::kReplicaExist:
resp.err = ERR_PATH_ALREADY_EXIST;
return false;
case fs_manager::disk_migrate_validation::kDiskNotFound:
resp.err = ERR_OBJECT_NOT_FOUND;
resp.__set_hint(err_msg);
return false;
case fs_manager::disk_migrate_validation::kDiskIOError:
resp.err = ERR_DISK_IO_ERROR;
return false;
default:
resp.err = ERR_OK;
return true;
}

resp.err = ERR_OK;
return true;
}

// THREAD_POOL_REPLICATION_LONG
Expand Down
6 changes: 5 additions & 1 deletion src/replica/replica_failover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <atomic>
#include <string>

#include "common/fs_manager.h"
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "dsn.layer2_types.h"
Expand All @@ -54,7 +56,9 @@ void replica::handle_local_failure(error_code error)
{
LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, enum_to_string(status()));

if (error == ERR_RDB_CORRUPTION) {
if (error == ERR_DISK_IO_ERROR) {
_dir_node->status = disk_status::IO_ERROR;
} else if (error == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

Expand Down
8 changes: 6 additions & 2 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,8 +1238,12 @@ void replica::handle_learning_error(error_code err, bool is_local_error)
err,
is_local_error ? "local_error" : "remote error");

if (is_local_error && err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
if (is_local_error) {
if (err == ERR_DISK_IO_ERROR) {
_dir_node->status = disk_status::IO_ERROR;
} else if (err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}
}

_stub->_counter_replicas_learning_recent_learn_fail_count->increment();
Expand Down
Loading

0 comments on commit 7b89eaf

Please sign in to comment.