Skip to content

Commit

Permalink
run ok
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Mar 1, 2024
1 parent 9aa90d4 commit f98eb68
Showing 1 changed file with 49 additions and 30 deletions.
79 changes: 49 additions & 30 deletions src/shell/commands/local_partition_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

// TODO(yingchun): check the correction.
int split_count = dst_partition_count / src_partition_count;
const int split_count = dst_partition_count / src_partition_count;
CHECK_GT(split_count, 0);
int log2n = log2(split_count);
if (pow(2, log2n) != split_count) {
Expand All @@ -152,7 +152,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

static const std::string data_dir_postfix("/replica/reps");
static const std::string split_dir_postfix("/split");
static const std::string split_dir_postfix("split");
for (const auto &src_data_dir : src_data_dirs) {
std::vector<std::string> replica_dirs;
const auto data_dir_replica_dirs = src_data_dir + data_dir_postfix;
Expand Down Expand Up @@ -356,7 +356,8 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

// Open writers.
std::shared_ptr<rocksdb::SstFileWriter> writers[split_count];
std::vector<std::string> dst_tmp_rdb_dirs;
dst_tmp_rdb_dirs.resize(split_count);
for (int i = 0; i < split_count; i++) {
// Split temporary dir.
auto dst_tmp_rdb_dir = fmt::format("{}/{}.{}.pegasus",
Expand All @@ -370,30 +371,25 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
return false;
}

// TODO(yingchun): options?
auto dst_tmp_rdb_file = fmt::format("{}{}", dst_tmp_rdb_dir, file.name);
writers[i] = std::make_shared<rocksdb::SstFileWriter>(rocksdb::EnvOptions(),
rocksdb::Options());
auto ws = writers[i]->Open(dst_tmp_rdb_file);
if (!ws.ok()) {
fmt::print(stderr,
"{}: open writer file '{}' failed\n",
ws.ToString(),
dst_tmp_rdb_file);
return false;
}
fmt::print(stdout, "open writer file '{}'\n", dst_tmp_rdb_file);
dst_tmp_rdb_dirs[i] = dst_tmp_rdb_dir;
}

// TODO(yingchun): 如果是一个 delete 操作,应该迭代不出来(?),那如果 level
// 底层(旧数据)有一个 put,level 高层(新数据)有一个 delete,
// 如果 put 可以迭代出来而 delete 迭代不出来的话,则可能出现已删除数据重现的问题?
std::unique_ptr<rocksdb::Iterator> iter(
reader->NewIterator(rocksdb::ReadOptions()));
int total_count = 0;
int empty_count = 0;
std::vector<int> split_counts;
split_counts.resize(split_count);
std::shared_ptr<rocksdb::SstFileWriter> writers[split_count];
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
total_count++;
// Calc the hash value and corresponding new partition index and sst writer.
const auto &skey = iter->key();
if (skey.empty()) {
empty_count++;
continue;
}
dsn::blob bb_key(skey.data(), 0, skey.size());
Expand All @@ -402,13 +398,31 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
dst_partition_count, hash_value);
CHECK_LE(0, new_pidx);
CHECK_LT(new_pidx, dst_partition_count);
int writer_idx = new_pidx / dst_partition_count;
int writer_idx = new_pidx / src_partition_count;
CHECK_LE(0, writer_idx);
CHECK_LT(writer_idx, split_count);

// TODO(yingchun): improve to check expired data.

// Create the writer if not exist.
if (!writers[writer_idx]) {
// TODO(yingchun): options?
auto dst_tmp_rdb_file =
fmt::format("{}{}", dst_tmp_rdb_dirs[writer_idx], file.name);
writers[writer_idx] = std::make_shared<rocksdb::SstFileWriter>(
rocksdb::EnvOptions(), rocksdb::Options());
auto ws = writers[writer_idx]->Open(dst_tmp_rdb_file);
if (!ws.ok()) {
fmt::print(stderr,
"{}: open writer file '{}' failed\n",
ws.ToString(),
dst_tmp_rdb_file);
return false;
}
}

// Write the data to the new partition sst file.
split_counts[writer_idx]++;
auto ws = writers[writer_idx]->Put(skey, iter->value());
if (!ws.ok()) {
fmt::print(stderr, "{}: from {} write failed\n", ws.ToString(), fname);
Expand All @@ -417,7 +431,12 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

// Release reader and writers.
fmt::print(stderr, "total_count: {}, empty_count: {}\n", total_count, empty_count);
for (int i = 0; i < split_count; i++) {
fmt::print(stderr, "[{}] count: {}\n", i, split_counts[i]);
if (split_counts[i] == 0) {
continue;
}
auto ws = writers[i]->Finish(nullptr);
if (!ws.ok()) {
fmt::print(stderr,
Expand Down Expand Up @@ -452,11 +471,11 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
_db_opts.create_if_missing = true;
_db_opts.create_missing_column_families =
true; // Create the 'pegasus_meta_cf' column family.
auto s = rocksdb::DB::Open(
auto os = rocksdb::DB::Open(
_db_opts, new_rdb_dir, column_families, &handles_opened, &_db);
if (!s.ok()) {
if (!os.ok()) {
fmt::print(
stderr, "{}: open rocksdb in {} failed\n", s.ToString(), new_rdb_dir);
stderr, "{}: open rocksdb in {} failed\n", os.ToString(), new_rdb_dir);
return false;
}
CHECK_EQ(2, handles_opened.size());
Expand All @@ -473,16 +492,14 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
// Gather all files.
rocksdb::IngestExternalFileArg arg;
arg.column_family = _data_cf;
if (!dsn::utils::filesystem::get_subdirectories(
if (!dsn::utils::filesystem::get_subfiles(
dst_tmp_rdb_dir, arg.external_files, false)) {
fmt::print(
stderr, "get sub-directories from '{}' failed\n", dst_tmp_rdb_dir);
fmt::print(stderr, "get sub-files from '{}' failed\n", dst_tmp_rdb_dir);
return false;
}

if (arg.external_files.empty() || arg.external_files[0].empty()) {
fmt::print(
stdout, "empty sub-directories '{}', skipped\n", dst_tmp_rdb_dir);
if (arg.external_files.empty()) {
fmt::print(stdout, "empty sub-files '{}', skipped\n", dst_tmp_rdb_dir);
continue;
}

Expand Down Expand Up @@ -545,19 +562,21 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
dsn::replication::replica_app_info rai(&new_ai);
const auto rai_path =
dsn::utils::filesystem::path_combine(new_data_dir_replica_dirs, kAppInfo);
auto err = rai.store(rai_path.c_str());
err = rai.store(rai_path.c_str());
if (err != dsn::ERR_OK) {
fmt::print(stderr, "{}: write {} failed\n", err, rai_path);
fmt::print(stderr, "{}: write replica_app_info '{}' failed\n", err, rai_path);
return false;
}

// Generate new ".init-info".
// TODO(yingchun): check the correctness.
dsn::replication::replica_init_info rii;
rii.init_durable_decree = last_committed_decree;
err = dsn::utils::load_rjobj_from_file(new_data_dir_replica_dirs, &rii);
const auto rii_path = dsn::utils::filesystem::path_combine(
new_data_dir_replica_dirs, replica_init_info::kInitInfo);
err = dsn::utils::dump_rjobj_to_file(rii, rii_path);
if (err != dsn::ERR_OK) {
fmt::print(stderr, "{}: write {} failed\n", err, new_data_dir_replica_dirs);
fmt::print(stderr, "{}: write replica_init_info '{}' failed\n", err, rii_path);
return false;
}
}
Expand Down

0 comments on commit f98eb68

Please sign in to comment.