Skip to content

Commit

Permalink
run
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Mar 1, 2024
1 parent 1c3b3c1 commit 9aa90d4
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/shell/commands/local_partition_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
PARSE_STRS(dst_data_dirs);

if (src_data_dirs.size() != dst_data_dirs.size()) {
fmt::print(stderr, "invalid command, the list size of <src_data_dirs> and <dst_data_dirs> "
"must be equal\n");
fmt::print(stderr,
"invalid command, the list size of <src_data_dirs> and <dst_data_dirs> "
"must be equal\n");
return false;
}

Expand Down Expand Up @@ -389,10 +390,12 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
// 如果 put 可以迭代出来而 delete 迭代不出来的话,则可能出现已删除数据重现的问题?
std::unique_ptr<rocksdb::Iterator> iter(
reader->NewIterator(rocksdb::ReadOptions()));
iter->SeekToFirst();
while (iter->Valid()) {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
// Calc the hash value and corresponding new partition index and sst writer.
const auto &skey = iter->key();
if (skey.empty()) {
continue;
}
dsn::blob bb_key(skey.data(), 0, skey.size());
uint64_t hash_value = pegasus::pegasus_key_hash(bb_key);
int new_pidx = dsn::replication::partition_resolver::get_partition_index(
Expand All @@ -411,7 +414,6 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
fmt::print(stderr, "{}: from {} write failed\n", ws.ToString(), fname);
return false;
}
iter->Next();
}

// Release reader and writers.
Expand All @@ -422,10 +424,9 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
"{}: finish writer split from file {} failed\n",
ws.ToString(),
fname);
return false;
continue;
}
}
reader.release();
}

// Create new partitions.
Expand Down Expand Up @@ -471,13 +472,20 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
if (dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) {
// Gather all files.
rocksdb::IngestExternalFileArg arg;
arg.column_family = _data_cf;
if (!dsn::utils::filesystem::get_subdirectories(
dst_tmp_rdb_dir, arg.external_files, false)) {
fmt::print(
stderr, "get sub-directories from '{}' failed\n", dst_tmp_rdb_dir);
return false;
}

if (arg.external_files.empty() || arg.external_files[0].empty()) {
fmt::print(
stdout, "empty sub-directories '{}', skipped\n", dst_tmp_rdb_dir);
continue;
}

// Ingest files.
// TODO(yingchun): any problem if they are in 2 CFs ?
auto iefs = _db->IngestExternalFiles({arg});
Expand Down

0 comments on commit 9aa90d4

Please sign in to comment.