Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: refactor replication_options initialize function #831

Merged
merged 7 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 133 additions & 92 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
* THE SOFTWARE.
*/

#include "replication_common.h"

#include <fstream>

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replica_envs.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/filesystem.h>

#include "replication_common.h"

namespace dsn {
namespace replication {

Expand Down Expand Up @@ -135,106 +136,34 @@ void replication_options::initialize()
}
slog_dir = utils::filesystem::path_combine(slog_dir, "slog");

// data_dirs
// - if config[data_dirs] is empty: "app_dir/reps"
// - else:
// config[data_dirs] = "tag1:dir1,tag2:dir2:tag3:dir3"
// data_dir = "config[data_dirs]/app_name/reps"
std::string dirs_str =
// get config_data_dirs and config_data_dir_tags from config
const std::string &dirs_str =
dsn_config_get_value_string("replication", "data_dirs", "", "replica directory list");
std::vector<std::string> dirs;
std::vector<std::string> dir_tags;
::dsn::utils::split_args(dirs_str.c_str(), dirs, ',');
if (dirs.empty()) {
dirs.push_back(app_dir);
dir_tags.push_back("default");
} else {
for (auto &dir : dirs) {
std::vector<std::string> tag_and_dir;
::dsn::utils::split_args(dir.c_str(), tag_and_dir, ':');
if (tag_and_dir.size() != 2) {
dassert(false, "invalid data_dir item(%s) in config", dir.c_str());
} else {
dassert(!tag_and_dir[0].empty() && !tag_and_dir[1].empty(),
"invalid data_dir item(%s) in config",
dir.c_str());
dir = utils::filesystem::path_combine(tag_and_dir[1], app_name);
for (unsigned i = 0; i < dir_tags.size(); ++i) {
dassert(dirs[i] != dir,
"dir(%s) and dir(%s) conflict",
dirs[i].c_str(),
dir.c_str());
}
for (unsigned i = 0; i < dir_tags.size(); ++i) {
dassert(dir_tags[i] != tag_and_dir[0],
"dir(%s) and dir(%s) have same tag(%s)",
dirs[i].c_str(),
dir.c_str(),
tag_and_dir[0].c_str());
}
dir_tags.push_back(tag_and_dir[0]);
}
}
}

std::vector<std::string> config_data_dirs;
std::vector<std::string> config_data_dir_tags;
std::string error_msg = "";
bool flag = get_data_dir_and_tag(
dirs_str, app_dir, app_name, config_data_dirs, config_data_dir_tags, error_msg);
dassert_f(flag, error_msg);

// check if data_dir in black list, data_dirs doesn't contain dir in black list
std::string black_list_file =
dsn_config_get_value_string("replication",
"data_dirs_black_list_file",
"/home/work/.pegasus_data_dirs_black_list",
"replica directory black list file");
std::vector<std::string> black_list;
if (!black_list_file.empty() && dsn::utils::filesystem::file_exists(black_list_file)) {
ddebug("data_dirs_black_list_file[%s] found, apply it", black_list_file.c_str());

std::ifstream file(black_list_file);
if (!file) {
dassert(false, "open data_dirs_black_list_file failed: %s", black_list_file.c_str());
}

std::string str;
int count = 0;
while (std::getline(file, str)) {
std::string str2 = dsn::utils::trim_string((char *)str.c_str());
if (str2.empty())
continue;
if (str2.back() != '/')
str2.append("/");
black_list.push_back(str2);
count++;
ddebug("black_list[%d] = [%s]", count, str2.c_str());
str.clear();
}
} else {
ddebug("data_dirs_black_list_file[%s] not found, ignore it", black_list_file.c_str());
}

int dir_count = 0;
for (unsigned i = 0; i < dirs.size(); ++i) {
std::string &dir = dirs[i];
bool in_black_list = false;
if (!black_list.empty()) {
std::string dir2 = dir;
if (dir2.back() != '/')
dir2.append("/");
for (std::string &black : black_list) {
if (dir2.find(black) == 0) {
in_black_list = true;
break;
}
}
}

if (in_black_list) {
dwarn("replica data dir %s is in black list, ignore it", dir.c_str());
} else {
ddebug("data_dirs[%d] = %s, tag = %s", dir_count++, dir.c_str(), dir_tags[i].c_str());
data_dirs.push_back(utils::filesystem::path_combine(dir, "reps"));
data_dir_tags.push_back(dir_tags[i]);
std::vector<std::string> black_list_dirs;
get_data_dirs_in_black_list(black_list_file, black_list_dirs);
for (auto i = 0; i < config_data_dirs.size(); ++i) {
if (check_if_in_black_list(black_list_dirs, config_data_dirs[i])) {
continue;
}
data_dirs.emplace_back(config_data_dirs[i]);
data_dir_tags.emplace_back(config_data_dir_tags[i]);
}

if (data_dirs.empty()) {
dassert(false, "no replica data dir found, maybe not set or excluded by black list");
dassert_f(false, "no replica data dir found, maybe not set or excluded by black list");
}

deny_client_on_start = dsn_config_get_value_bool("replication",
Expand Down Expand Up @@ -569,6 +498,118 @@ void replica_helper::load_meta_servers(/*out*/ std::vector<dsn::rpc_address> &se
dassert(servers.size() > 0, "no meta server specified in config [%s].%s", section, key);
}

/*static*/ bool
replication_options::get_data_dir_and_tag(const std::string &config_dirs_str,
const std::string &default_dir,
const std::string &app_name,
/*out*/ std::vector<std::string> &data_dirs,
/*out*/ std::vector<std::string> &data_dir_tags,
/*out*/ std::string &err_msg)
{
// - if {config_dirs_str} is empty (return true):
hycdong marked this conversation as resolved.
Show resolved Hide resolved
// - dir = {default_dir}
// - dir_tag/data_dir_tag = "default"
// - data_dir = {default_dir}/"reps"
// - else if {config_dirs_str} = "tag1:dir1,tag2:dir2:tag3:dir3" (return true):
// - dir1 = "dir1"/{app_name}
// - dir_tag1/data_dir_tag1 = "tag1"
// - data_dir1 = "dir1"/{app_name}/"reps"
// - else (return false):
// - invalid format and set {err_msg}
std::vector<std::string> dirs;
std::vector<std::string> dir_tags;
utils::split_args(config_dirs_str.c_str(), dirs, ',');
if (dirs.empty()) {
dirs.push_back(default_dir);
dir_tags.push_back("default");
} else {
for (auto &dir : dirs) {
std::vector<std::string> tag_and_dir;
utils::split_args(dir.c_str(), tag_and_dir, ':');
if (tag_and_dir.size() != 2) {
err_msg = fmt::format("invalid data_dir item({}) in config", dir);
return false;
}
if (tag_and_dir[0].empty() || tag_and_dir[1].empty()) {
err_msg = fmt::format("invalid data_dir item({}) in config", dir);
return false;
}
dir = utils::filesystem::path_combine(tag_and_dir[1], app_name);
for (unsigned i = 0; i < dir_tags.size(); ++i) {
if (dirs[i] == dir) {
err_msg = fmt::format("dir({}) and dir({}) conflict", dirs[i], dir);
return false;
}
}
for (unsigned i = 0; i < dir_tags.size(); ++i) {
if (dir_tags[i] == tag_and_dir[0]) {
err_msg = fmt::format(
"dir({}) and dir({}) have same tag({})", dirs[i], dir, tag_and_dir[0]);
return false;
}
}
dir_tags.push_back(tag_and_dir[0]);
}
}

for (unsigned i = 0; i < dirs.size(); ++i) {
const std::string &dir = dirs[i];
ddebug_f("data_dirs[{}] = {}, tag = {}", i + 1, dir, dir_tags[i]);
data_dirs.push_back(utils::filesystem::path_combine(dir, "reps"));
data_dir_tags.push_back(dir_tags[i]);
}
return true;
}

/*static*/ void
replication_options::get_data_dirs_in_black_list(const std::string &fname,
/*out*/ std::vector<std::string> &dirs)
{
if (fname.empty() || (!fname.empty() && !utils::filesystem::file_exists(fname))) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
ddebug_f("data_dirs_black_list_file[{}] not found, ignore it", fname);
return;
}

ddebug_f("data_dirs_black_list_file[{}] found, apply it", fname);
std::ifstream file(fname);
if (!file) {
dassert_f(false, "open data_dirs_black_list_file failed: {}", fname);
}

std::string str;
int count = 0;
while (std::getline(file, str)) {
std::string str2 = utils::trim_string(const_cast<char *>(str.c_str()));
if (str2.empty())
continue;
if (str2.back() != '/')
str2.append("/");
hycdong marked this conversation as resolved.
Show resolved Hide resolved
dirs.push_back(str2);
count++;
ddebug_f("black_list[{}] = [{}]", count, str2);
str.clear();
hycdong marked this conversation as resolved.
Show resolved Hide resolved
}
}

/*static*/ bool
replication_options::check_if_in_black_list(const std::vector<std::string> &black_list_dir,
const std::string &dir)
{
bool in_black_list = false;
std::string dir_str = dir;
if (!black_list_dir.empty()) {
if (dir_str.back() != '/')
dir_str.append("/");
hycdong marked this conversation as resolved.
Show resolved Hide resolved
for (const std::string &black : black_list_dir) {
if (dir_str.find(black) == 0) {
in_black_list = true;
break;
}
}
}
return in_black_list;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
}

const std::string backup_restore_constant::FORCE_RESTORE("restore.force_restore");
const std::string backup_restore_constant::BLOCK_SERVICE_PROVIDER("restore.block_service_provider");
const std::string backup_restore_constant::CLUSTER_NAME("restore.cluster_name");
Expand Down
13 changes: 12 additions & 1 deletion src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,20 @@ class replication_options

public:
replication_options();
void initialize();
~replication_options();

void initialize();
static bool get_data_dir_and_tag(const std::string &config_dirs_str,
const std::string &default_dir,
const std::string &app_name,
/*out*/ std::vector<std::string> &data_dirs,
/*out*/ std::vector<std::string> &data_dir_tags,
/*out*/ std::string &err_msg);
static void get_data_dirs_in_black_list(const std::string &fname,
/*out*/ std::vector<std::string> &dirs);
static bool check_if_in_black_list(const std::vector<std::string> &black_list_dir,
const std::string &dir);

private:
void sanity_check();
};
Expand Down
Loading