Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(disk): support add disk rpc (#839)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Jul 22, 2021
1 parent 47871eb commit 31ad7e9
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 20 deletions.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_QUERY_DISK_INFO, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_REPLICA_DISK_MIGRATE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_DETECT_HOTKEY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_ADD_NEW_DISK, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_ANALYZE_HOTKEY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BACKGROUND_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BULK_LOAD_INGESTION, TASK_PRIORITY_HIGH)
Expand Down
4 changes: 4 additions & 0 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ class replication_ddl_client

error_with<query_split_response> query_partition_split(const std::string &app_name);

error_with<add_new_disk_response> add_new_disk(const rpc_address &target_node,
const std::string &disk_str);

private:
bool static valid_app_char(int c);

Expand Down Expand Up @@ -308,6 +311,7 @@ class replication_ddl_client

typedef rpc_holder<detect_hotkey_request, detect_hotkey_response> detect_hotkey_rpc;
typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
typedef rpc_holder<add_new_disk_request, add_new_disk_response> add_new_disk_rpc;
};
} // namespace replication
} // namespace dsn
14 changes: 14 additions & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1670,5 +1670,19 @@ replication_ddl_client::query_partition_split(const std::string &app_name)
return call_rpc_sync(query_split_rpc(std::move(req), RPC_CM_QUERY_PARTITION_SPLIT));
}

error_with<add_new_disk_response>
replication_ddl_client::add_new_disk(const rpc_address &target_node, const std::string &disk_str)
{
auto req = make_unique<add_new_disk_request>();
req->disk_str = disk_str;

std::map<rpc_address, add_new_disk_rpc> add_new_disk_rpcs;
add_new_disk_rpcs.emplace(target_node, add_new_disk_rpc(std::move(req), RPC_ADD_NEW_DISK));

std::map<rpc_address, error_with<add_new_disk_response>> resps;
call_rpcs_sync(add_new_disk_rpcs, resps);
return resps.begin()->second.get_value();
}

} // namespace replication
} // namespace dsn
25 changes: 25 additions & 0 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,5 +340,30 @@ void fs_manager::update_disk_stat(bool check_status_changed)
_counter_min_available_ratio->set(_min_available_ratio);
_counter_max_available_ratio->set(_max_available_ratio);
}

void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
{
zauto_write_lock l(_lock);
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
dir_node *n = new dir_node(tag, norm_path);
_dir_nodes.emplace_back(n);
_available_data_dirs.emplace_back(data_dir);
ddebug_f("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
}

bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const
{
zauto_read_lock l(_lock);
for (const auto &dir_node : _dir_nodes) {
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
if (dir_node->full_dir == norm_path || dir_node->tag == tag) {
return true;
}
}
return false;
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class fs_manager
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat(bool check_status_changed = true);

void add_new_dir_node(const std::string &data_dir, const std::string &tag);
bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::string> &get_available_data_dirs() const
{
zauto_read_lock l(_lock);
Expand Down
16 changes: 16 additions & 0 deletions src/common/replica_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,19 @@ struct detect_hotkey_response {
2: optional string err_hint;
3: optional string hotkey_result;
}

struct add_new_disk_request {
// format is "disk_tag:disk_dir,tag2:dir2"
// for example: "ssd1:/home/work/ssd1"
1: string disk_str;
}

struct add_new_disk_response {
// Possible error:
// - ERR_INVALID_PARAMETERS: invalid disk_str in request
// - ERR_NODE_ALREADY_EXIST: data_dir is already available
// - ERR_DIR_NOT_EMPTY: data_dir is not empty
// - ERR_FILE_OPERATION_FAILED: can't create data_dir or directory can't read/write
1: dsn.error_code err;
2: optional string err_hint;
}
23 changes: 16 additions & 7 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ void replication_options::initialize()
dsn_config_get_value_string("replication", "data_dirs", "", "replica directory list");
std::vector<std::string> config_data_dirs;
std::vector<std::string> config_data_dir_tags;
get_data_dir_and_tag(dirs_str, app_dir, app_name, config_data_dirs, config_data_dir_tags);
std::string error_msg = "";
bool flag = get_data_dir_and_tag(
dirs_str, app_dir, app_name, config_data_dirs, config_data_dir_tags, error_msg);
dassert_f(flag, error_msg);

// check if data_dir in black list, data_dirs doesn't contain dir in black list
std::string black_list_file =
Expand Down Expand Up @@ -495,12 +498,13 @@ void replica_helper::load_meta_servers(/*out*/ std::vector<dsn::rpc_address> &se
dassert(servers.size() > 0, "no meta server specified in config [%s].%s", section, key);
}

/*static*/ void
/*static*/ bool
replication_options::get_data_dir_and_tag(const std::string &config_dirs_str,
const std::string &default_dir,
const std::string &app_name,
/*out*/ std::vector<std::string> &data_dirs,
/*out*/ std::vector<std::string> &data_dir_tags)
/*out*/ std::vector<std::string> &data_dir_tags,
/*out*/ std::string &err_msg)
{
// - if {config_dirs_str} is empty (return true):
// - dir = {default_dir}
Expand All @@ -523,21 +527,25 @@ replication_options::get_data_dir_and_tag(const std::string &config_dirs_str,
std::vector<std::string> tag_and_dir;
utils::split_args(dir.c_str(), tag_and_dir, ':');
if (tag_and_dir.size() != 2) {
dassert_f("invalid data_dir item({}) in config", dir);
err_msg = fmt::format("invalid data_dir item({}) in config", dir);
return false;
}
if (tag_and_dir[0].empty() || tag_and_dir[1].empty()) {
dassert_f("invalid data_dir item({}) in config", dir);
err_msg = fmt::format("invalid data_dir item({}) in config", dir);
return false;
}
dir = utils::filesystem::path_combine(tag_and_dir[1], app_name);
for (unsigned i = 0; i < dir_tags.size(); ++i) {
if (dirs[i] == dir) {
dassert_f("dir({}) and dir({}) conflict", dirs[i], dir);
err_msg = fmt::format("dir({}) and dir({}) conflict", dirs[i], dir);
return false;
}
}
for (unsigned i = 0; i < dir_tags.size(); ++i) {
if (dir_tags[i] == tag_and_dir[0]) {
dassert_f(
err_msg = fmt::format(
"dir({}) and dir({}) have same tag({})", dirs[i], dir, tag_and_dir[0]);
return false;
}
}
dir_tags.push_back(tag_and_dir[0]);
Expand All @@ -550,6 +558,7 @@ replication_options::get_data_dir_and_tag(const std::string &config_dirs_str,
data_dirs.push_back(utils::filesystem::path_combine(dir, "reps"));
data_dir_tags.push_back(dir_tags[i]);
}
return true;
}

/*static*/ void
Expand Down
5 changes: 3 additions & 2 deletions src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ class replication_options
~replication_options();

void initialize();
static void get_data_dir_and_tag(const std::string &config_dirs_str,
static bool get_data_dir_and_tag(const std::string &config_dirs_str,
const std::string &default_dir,
const std::string &app_name,
/*out*/ std::vector<std::string> &data_dirs,
/*out*/ std::vector<std::string> &data_dir_tags);
/*out*/ std::vector<std::string> &data_dir_tags,
/*out*/ std::string &err_msg);
static void get_data_dirs_in_black_list(const std::string &fname,
/*out*/ std::vector<std::string> &dirs);
static bool check_if_in_black_list(const std::vector<std::string> &black_list_dir,
Expand Down
27 changes: 24 additions & 3 deletions src/common/test/replication_common_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,37 @@ TEST(replication_common, get_data_dir_test)
{
std::vector<std::string> data_dirs;
std::vector<std::string> data_dir_tags;
std::string err_msg = "";

// Test cases:
// - default dir: ""
// - invalid dir:
// - "wrong_dir"
// - "tag:dir:wrong"
// - "tag:"
// - "tag1:disk,tag2,disk"
// - "tag:disk1,tag:disk2"
// - valid: "tag1:disk1,tag2:disk2"
struct get_data_dir_test
{
std::string data_dir_str;
bool expected_val;
int32_t expected_length;
} tests[] = {{"", 1}, {"tag1:disk1", 1}, {"tag1:disk1, ", 1}, {"tag1:disk1,tag2:disk2", 2}};
} tests[] = {{"", true, 1},
{"wrong_dir", false, 0},
{"tag:dir:wrong", false, 0},
{"tag:", false, 0},
{"tag1:disk,tag2,disk", false, 0},
{"tag:disk1,tag:disk2", false, 0},
{"tag1:disk1", true, 1},
{"tag1:disk1, ", true, 1},
{"tag1:disk1,tag2:disk2", true, 2}};
for (const auto &test : tests) {
data_dirs.clear();
data_dir_tags.clear();
replication_options::get_data_dir_and_tag(
test.data_dir_str, "test_dir", "replica", data_dirs, data_dir_tags);
bool flag = replication_options::get_data_dir_and_tag(
test.data_dir_str, "test_dir", "replica", data_dirs, data_dir_tags, err_msg);
ASSERT_EQ(flag, test.expected_val);
ASSERT_EQ(data_dirs.size(), data_dir_tags.size());
ASSERT_EQ(data_dirs.size(), test.expected_length);
}
Expand Down
50 changes: 49 additions & 1 deletion src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ DSN_DEFINE_bool("replication",
ignore_broken_disk,
true,
"true means ignore broken data disk when initialize");
DSN_TAG_VARIABLE(ignore_broken_disk, FT_MUTABLE);

bool replica_stub::s_not_exit_on_log_failure = false;

Expand Down Expand Up @@ -1143,6 +1142,53 @@ void replica_stub::on_query_app_info(query_app_info_rpc rpc)
}
}

// ThreadPool: THREAD_POOL_DEFAULT
void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
{
const auto &disk_str = rpc.request().disk_str;
auto &resp = rpc.response();
resp.err = ERR_OK;

std::vector<std::string> data_dirs;
std::vector<std::string> data_dir_tags;
std::string err_msg = "";
if (disk_str.empty() ||
!replication_options::get_data_dir_and_tag(
disk_str, "", "replica", data_dirs, data_dir_tags, err_msg)) {
resp.err = ERR_INVALID_PARAMETERS;
resp.__set_err_hint(fmt::format("invalid str({}), err_msg: {}", disk_str, err_msg));
return;
}

for (auto i = 0; i < data_dir_tags.size(); ++i) {
auto dir = data_dirs[i];
if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) {
resp.err = ERR_NODE_ALREADY_EXIST;
resp.__set_err_hint(
fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i]));
return;
}

if (dsn_unlikely(utils::filesystem::directory_exists(dir) &&
!utils::filesystem::is_directory_empty(dir).second)) {
resp.err = ERR_DIR_NOT_EMPTY;
resp.__set_err_hint(fmt::format("Disk({}) directory is not empty", dir));
return;
}

std::string cdir;
if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
!utils::filesystem::check_dir_rw(dir, err_msg))) {
resp.err = ERR_FILE_OPERATION_FAILED;
resp.__set_err_hint(err_msg);
return;
}

ddebug_f("Add a new disk in fs_manager, data_dir={}, tag={}", cdir, data_dir_tags[i]);
_fs_manager.add_new_dir_node(cdir, data_dir_tags[i]);
}
}

void replica_stub::on_prepare(dsn::message_ex *request)
{
gpid id;
Expand Down Expand Up @@ -2200,6 +2246,8 @@ void replica_stub::open_service()
RPC_GROUP_BULK_LOAD, "group_bulk_load", &replica_stub::on_group_bulk_load);
register_rpc_handler_with_rpc_holder(
RPC_DETECT_HOTKEY, "detect_hotkey", &replica_stub::on_detect_hotkey);
register_rpc_handler_with_rpc_holder(
RPC_ADD_NEW_DISK, "add_new_disk", &replica_stub::on_add_new_disk);

register_ctrl_command();
}
Expand Down
3 changes: 3 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ typedef rpc_holder<update_child_group_partition_count_request,
update_child_group_partition_count_rpc;
typedef rpc_holder<group_bulk_load_request, group_bulk_load_response> group_bulk_load_rpc;
typedef rpc_holder<detect_hotkey_request, detect_hotkey_response> detect_hotkey_rpc;
typedef rpc_holder<add_new_disk_request, add_new_disk_response> add_new_disk_rpc;

class mutation_log;
namespace test {
Expand Down Expand Up @@ -224,6 +225,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
void query_app_manual_compact_status(
int32_t app_id, /*out*/ std::unordered_map<gpid, manual_compaction_status> &status);

void on_add_new_disk(add_new_disk_rpc rpc);

private:
enum replica_node_state
{
Expand Down
4 changes: 0 additions & 4 deletions src/replica/test/replica_disk_migrate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ class replica_disk_migrate_test : public replica_disk_test_base
replica_disk_migrate_rpc fake_migrate_rpc;

public:
replica_disk_migrate_test() { fail::setup(); }

~replica_disk_migrate_test() { fail::teardown(); }

void SetUp() override { generate_fake_rpc(); }

replica_ptr get_replica(const dsn::gpid &pid) const
Expand Down
Loading

0 comments on commit 31ad7e9

Please sign in to comment.