Skip to content

Commit

Permalink
fmt 18
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Mar 5, 2024
1 parent 32bef06 commit abde84d
Showing 1 changed file with 84 additions and 53 deletions.
137 changes: 84 additions & 53 deletions src/shell/commands/local_partition_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,32 +250,64 @@ bool split_file(const LocalPartitionSplitContext &lpsc,
return true;
}

bool open_rocksdb(const rocksdb::DBOptions &db_opts,
const std::string &rdb_dir,
bool read_only,
const std::vector<rocksdb::ColumnFamilyDescriptor> &cf_dscs,
std::vector<rocksdb::ColumnFamilyHandle *> *cf_hdls,
rocksdb::DB **db)
{
CHECK_NOTNULL(cf_hdls, "");
CHECK_NOTNULL(db, "");
if (read_only) {
RETURN_FALSE_IF_NON_RDB_OK(
rocksdb::DB::OpenForReadOnly(db_opts, rdb_dir, cf_dscs, cf_hdls, db),
"open rocksdb in '{}' failed",
rdb_dir);
} else {
RETURN_FALSE_IF_NON_RDB_OK(rocksdb::DB::Open(db_opts, rdb_dir, cf_dscs, cf_hdls, db),
"open rocksdb in '{}' failed",
rdb_dir);
}
CHECK_EQ(2, cf_hdls->size());
CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, (*cf_hdls)[0]->GetName());
CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, (*cf_hdls)[1]->GetName());

return true;
}

void release_db(std::vector<rocksdb::ColumnFamilyHandle *> *cf_hdls, rocksdb::DB **db)
{
CHECK_NOTNULL(cf_hdls, "");
CHECK_NOTNULL(db, "");
for (auto cf_hdl : *cf_hdls) {
delete cf_hdl;
}
cf_hdls->clear();
delete *db;
*db = nullptr;
}

bool split_partition(const LocalPartitionSplitContext &lpsc,
const ToSplitPatition &tsp,
const std::string &src_replicas_dir,
const std::string &dst_replicas_dir,
const std::string &tmp_split_replicas_dir)
{
static const std::string kRdbDirPostfix("/data/rdb/");
const auto rdb_dir = tsp.replica_dir + kRdbDirPostfix;
fmt::print(stdout, " start to split '{}'\n", rdb_dir);

// 1. Open original rocksdb in read-only mode.
// 1. Open the original rocksdb in read-only mode.
rocksdb::DBOptions db_opts;
// The following options should be set in Pegasus 2.0 and lower versions.
// db_opts.pegasus_data = true;
// db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX;
std::vector<rocksdb::ColumnFamilyDescriptor> cf_dscs(
const std::vector<rocksdb::ColumnFamilyDescriptor> cf_dscs(
{{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}},
{pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}});
std::vector<rocksdb::ColumnFamilyHandle *> cf_hdls;
rocksdb::DB *db = nullptr;
RETURN_FALSE_IF_NON_RDB_OK(
rocksdb::DB::OpenForReadOnly(db_opts, rdb_dir, cf_dscs, &cf_hdls, &db),
"open rocksdb in '{}' failed",
rdb_dir);
CHECK_EQ(2, cf_hdls.size());
CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, cf_hdls[0]->GetName());
CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, cf_hdls[1]->GetName());
RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, rdb_dir, true, cf_dscs, &cf_hdls, &db), "");

// 2. Get metadata from rocksdb.
// - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST
Expand Down Expand Up @@ -304,15 +336,11 @@ bool split_partition(const LocalPartitionSplitContext &lpsc,
db->GetLiveFilesMetaData(&files);

// 4. Close rocksdb.
for (auto h : cf_hdls) {
delete h;
}
cf_hdls.clear();
delete db;
db = nullptr;
release_db(&cf_hdls, &db);

// 5. Split the sst files.
auto files_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(rocksdb::NewThreadPool(8));
auto files_thread_pool =
std::unique_ptr<rocksdb::ThreadPool>(rocksdb::NewThreadPool(lpsc.threads_per_partition));
std::atomic<uint64_t> total_count = 0;
for (const auto &file : files) {
// Skip metadata column family files, we will write metadata manually later in
Expand Down Expand Up @@ -340,11 +368,13 @@ bool split_partition(const LocalPartitionSplitContext &lpsc,
// 6. Create new rocksdb instances for the new partitions.
// TODO(yingchun): poolize the following operations if necessary.
for (int i = 0; i < lpsc.split_count; i++) {
const auto new_data_dir_replica_dir =
construct_split_directory(src_replicas_dir, tsp, lpsc.dst_app_id, i);
const auto new_rdb_dir = new_data_dir_replica_dir + kRdbDirPostfix;
// The new replica is placed on 'dst_replicas_dir'.
const auto new_replica_dir =
construct_split_directory(dst_replicas_dir, tsp, lpsc.dst_app_id, i);
const auto new_rdb_dir = new_replica_dir + kRdbDirPostfix;

// i. Create the directory for the split rocksdb.
// TODO(yingchun): make sure it's not exist!
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(new_rdb_dir),
"create directory '{}' failed",
new_rdb_dir);
Expand All @@ -354,13 +384,7 @@ bool split_partition(const LocalPartitionSplitContext &lpsc,
new_db_opts.create_if_missing = true;
// Create the 'pegasus_meta_cf' column family.
new_db_opts.create_missing_column_families = true;
RETURN_FALSE_IF_NON_RDB_OK(
rocksdb::DB::Open(new_db_opts, new_rdb_dir, cf_dscs, &cf_hdls, &db),
"open rocksdb in '{}' failed",
new_rdb_dir);
CHECK_EQ(2, cf_hdls.size());
CHECK_EQ(cf_hdls[0]->GetName(), pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME);
CHECK_EQ(cf_hdls[1]->GetName(), pegasus::server::meta_store::META_COLUMN_FAMILY_NAME);
RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, new_rdb_dir, false, cf_dscs, &cf_hdls, &db), "");

// iii. Ingest the split sst files to the new rocksdb.
do {
Expand Down Expand Up @@ -432,12 +456,7 @@ bool split_partition(const LocalPartitionSplitContext &lpsc,
db->Flush(options, cf_hdls), "flush rocksdb in '{}' failed", new_rdb_dir);

// v. Close rocksdb.
for (auto h : cf_hdls) {
delete h;
}
cf_hdls.clear();
delete db;
db = nullptr;
release_db(&cf_hdls, &db);

// vi. Generate new ".app-info".
dsn::app_info new_ai(tsp.ai);
Expand All @@ -449,15 +468,15 @@ bool split_partition(const LocalPartitionSplitContext &lpsc,
new_ai.init_partition_count = -1;
dsn::replication::replica_app_info rai(&new_ai);
const auto rai_path = dsn::utils::filesystem::path_combine(
new_data_dir_replica_dir, dsn::replication::replica_app_info::kAppInfo);
new_replica_dir, dsn::replication::replica_app_info::kAppInfo);
RETURN_FALSE_IF_NON_OK(rai.store(rai_path), "write replica_app_info '{}' failed", rai_path);

// vii. Generate new ".init-info".
dsn::replication::replica_init_info new_rii(tsp.rii);
new_rii.init_offset_in_shared_log = 0;
new_rii.init_offset_in_private_log = 0;
const auto rii_path = dsn::utils::filesystem::path_combine(new_data_dir_replica_dir,
replica_init_info::kInitInfo);
const auto rii_path =
dsn::utils::filesystem::path_combine(new_replica_dir, replica_init_info::kInitInfo);
RETURN_FALSE_IF_NON_OK(dsn::utils::dump_rjobj_to_file(new_rii, rii_path),
"write replica_init_info '{}' failed",
rii_path);
Expand All @@ -466,19 +485,23 @@ bool split_partition(const LocalPartitionSplitContext &lpsc,
return true;
}

bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::string &src_data_dir)
bool split_data_directory(const LocalPartitionSplitContext &lpsc,
const std::string &src_data_dir,
const std::string &dst_data_dir)
{
// 1. Find all replica directories in the current data directory.
static const std::string kReplicasDirPostfix("replica/reps");

// 1. Collect all replica directories from 'src_data_dir'.
const auto src_replicas_dir =
dsn::utils::filesystem::path_combine(src_data_dir, "replica/reps");
dsn::utils::filesystem::path_combine(src_data_dir, kReplicasDirPostfix);
std::vector<std::string> replica_dirs;
RETURN_FALSE_IF_NOT(
dsn::utils::filesystem::get_subdirectories(src_replicas_dir, replica_dirs, false),
"invalid command, get sub-directories from '{}' failed",
src_replicas_dir);

// 2. Create temporary split directory.
const auto tmp_split_replicas_dir = dsn::utils::filesystem::path_combine(src_data_dir, "split");
// 2. Create temporary split directory on 'dst_data_dir'.
const auto tmp_split_replicas_dir = dsn::utils::filesystem::path_combine(dst_data_dir, "split");
RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(tmp_split_replicas_dir),
"create split directory '{}' failed",
tmp_split_replicas_dir);
Expand All @@ -488,7 +511,7 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str
std::set<uint32_t> exist_app_ids;
const std::set<std::string> ordered_replica_dirs(replica_dirs.begin(), replica_dirs.end());
for (const auto &replica_dir : ordered_replica_dirs) {
// Validate the replica directory.
// i. Validate the replica directory.
dsn::app_info ai;
dsn::gpid pid;
std::string hint_message;
Expand All @@ -497,13 +520,14 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str
continue;
}

// Skip the non-<src_app_id>.
// ii. Skip the non-<src_app_id>.
CHECK_EQ(pid.get_app_id(), ai.app_id);
if (ai.app_id != lpsc.src_app_id) {
continue;
}

// Skip and warning for the replica with the same app id but not desired partition index.
// iii. Skip and warning for the replica with the same app id but not desired partition
// index.
const auto cur_pidx = pid.get_partition_index();
if (lpsc.src_partition_ids.count(cur_pidx) == 0) {
fmt::print(stdout,
Expand All @@ -513,7 +537,7 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str
continue;
}

// Continue and warning if the <dst_app_id> exist.
// iv. Continue and warning if the <dst_app_id> exist.
exist_app_ids.insert(ai.app_id);
if (exist_app_ids.count(lpsc.dst_app_id) != 0) {
fmt::print(
Expand All @@ -523,38 +547,42 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str
lpsc.dst_app_id);
}

// Check if <src_partition_count> matches.
// v. Check if <src_partition_count> matches.
RETURN_FALSE_IF_NOT(ai.partition_count == lpsc.src_partition_count,
"unmatched <src_partition_count> ({} vs {})",
ai.partition_count,
lpsc.src_partition_count);

// Check the app status.
// vi. Check the app status.
RETURN_FALSE_IF_NOT(ai.status == dsn::app_status::AS_AVAILABLE,
"not support to split app '{}' in non-AVAILABLE status",
ai.app_name);

// Check if the app is duplicating or bulk loading.
// vii. Check if the app is duplicating or bulk loading.
RETURN_FALSE_IF_NOT(!ai.duplicating && !ai.is_bulk_loading,
"not support to split app '{}' which is duplicating or bulk loading",
ai.app_name);

// viii. Load the replica_init_info.
dsn::replication::replica_init_info rii;
const auto rii_path =
dsn::utils::filesystem::path_combine(replica_dir, replica_init_info::kInitInfo);
RETURN_FALSE_IF_NON_OK(dsn::utils::load_rjobj_from_file(rii_path, &rii),
"load replica_init_info from '{}' failed",
rii_path);

// ix. Gather the replica.
to_split_partitions.push_back({replica_dir, ai, rii, pid.get_partition_index()});
}

// 4. Split the partitions.
// TODO(yingchun): make it optional.
auto partitions_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(rocksdb::NewThreadPool(8));
const auto dst_replicas_dir =
dsn::utils::filesystem::path_combine(src_data_dir, kReplicasDirPostfix);
auto partitions_thread_pool =
std::unique_ptr<rocksdb::ThreadPool>(rocksdb::NewThreadPool(lpsc.threads_per_data_dir));
for (const auto &tsp : to_split_partitions) {
partitions_thread_pool->SubmitJob([=]() {
if (!split_partition(lpsc, tsp, src_replicas_dir, tmp_split_replicas_dir)) {
if (!split_partition(lpsc, tsp, dst_replicas_dir, tmp_split_replicas_dir)) {
fmt::print(stderr, " split partition '{}' failed\n", tsp.replica_dir);
} else {
fmt::print(stdout, " split partition '{}' succeed\n", tsp.replica_dir);
Expand Down Expand Up @@ -596,9 +624,12 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
// 3. Split each data directory.
auto data_dirs_thread_pool = std::unique_ptr<rocksdb::ThreadPool>(
rocksdb::NewThreadPool(static_cast<int>(lpsc.src_data_dirs.size())));
for (const auto &src_data_dir : lpsc.src_data_dirs) {
CHECK_EQ(lpsc.src_data_dirs.size(), lpsc.dst_data_dirs.size());
for (auto i = 0; i < lpsc.src_data_dirs.size(); i++) {
const auto &src_data_dir = lpsc.src_data_dirs[i];
const auto &dst_data_dir = lpsc.dst_data_dirs[i];
data_dirs_thread_pool->SubmitJob([=]() {
if (!split_data_directory(lpsc, src_data_dir)) {
if (!split_data_directory(lpsc, src_data_dir, dst_data_dir)) {
fmt::print(stderr, "split data directory '{}' failed\n", src_data_dir);
} else {
fmt::print(stdout, "split data directory '{}' succeed\n", src_data_dir);
Expand Down

0 comments on commit abde84d

Please sign in to comment.