Skip to content

Commit

Permalink
fmt 4
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Mar 4, 2024
1 parent 01b32a9 commit ad2362a
Showing 1 changed file with 36 additions and 39 deletions.
75 changes: 36 additions & 39 deletions src/shell/commands/local_partition_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,31 +199,28 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
// db_opts.pegasus_data = true;
// db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX;

rocksdb::ColumnFamilyOptions _data_cf_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::ColumnFamilyHandle *data_cf = nullptr;
rocksdb::ColumnFamilyHandle *meta_cf = nullptr;
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, _data_cf_opts},
{pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}},
{pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}});
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
rocksdb::DB *_db;
rocksdb::DB *db = nullptr;
auto s = rocksdb::DB::OpenForReadOnly(
db_opts, rdb_dir, column_families, &handles_opened, &_db);
db_opts, rdb_dir, column_families, &handles_opened, &db);
if (!s.ok()) {
fmt::print(stderr, "{}: open rocksdb in {} failed\n", s.ToString(), rdb_dir);
continue;
fmt::print(stderr, "{}: open rocksdb in '{}' failed\n", s.ToString(), rdb_dir);
return false
}
CHECK_EQ(2, handles_opened.size());
CHECK_EQ(handles_opened[0]->GetName(),
pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME);
CHECK_EQ(handles_opened[1]->GetName(),
pegasus::server::meta_store::META_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
_meta_cf = handles_opened[1];
CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, handles_opened[0]->GetName());
CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, handles_opened[1]->GetName());
data_cf = handles_opened[0];
meta_cf = handles_opened[1];

// Get metadata in rocksdb.
auto ms = std::make_unique<pegasus::server::meta_store>(rdb_dir.c_str(), _db, _meta_cf);
// Since Pegasus 2.x, the metadata is stored in the meta column family.
auto ms = std::make_unique<pegasus::server::meta_store>(rdb_dir.c_str(), db, meta_cf);
uint64_t last_committed_decree;
auto err = ms->get_last_flushed_decree(&last_committed_decree);
if (err != dsn::ERR_OK) {
Expand All @@ -250,15 +247,15 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg

// Get all sst files.
std::vector<rocksdb::LiveFileMetaData> files;
_db->GetLiveFilesMetaData(&files);
db->GetLiveFilesMetaData(&files);

// Close rdb.
_db->DestroyColumnFamilyHandle(_data_cf);
_data_cf = nullptr;
_db->DestroyColumnFamilyHandle(_meta_cf);
_meta_cf = nullptr;
delete _db;
_db = nullptr;
db->DestroyColumnFamilyHandle(data_cf);
data_cf = nullptr;
db->DestroyColumnFamilyHandle(meta_cf);
meta_cf = nullptr;
delete db;
db = nullptr;

for (const auto &file : files) {
// 'replica_dir' has a duplicate '/' when contact 'file.db_path' and 'file.name', it
Expand Down Expand Up @@ -408,7 +405,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
db_opts.create_missing_column_families =
true; // Create the 'pegasus_meta_cf' column family.
auto os =
rocksdb::DB::Open(db_opts, new_rdb_dir, column_families, &handles_opened, &_db);
rocksdb::DB::Open(db_opts, new_rdb_dir, column_families, &handles_opened, &db);
if (!os.ok()) {
fmt::print(
stderr, "{}: open rocksdb in {} failed\n", os.ToString(), new_rdb_dir);
Expand All @@ -419,8 +416,8 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME);
CHECK_EQ(handles_opened[1]->GetName(),
pegasus::server::meta_store::META_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
_meta_cf = handles_opened[1];
data_cf = handles_opened[0];
meta_cf = handles_opened[1];

const auto dst_tmp_rdb_dir = fmt::format("{}/{}.{}.pegasus",
kSplitDataDirReplicaDirs,
Expand All @@ -429,7 +426,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
if (dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) {
// Gather all files.
rocksdb::IngestExternalFileArg arg;
arg.column_family = _data_cf;
arg.column_family = data_cf;
if (!dsn::utils::filesystem::get_subfiles(
dst_tmp_rdb_dir, arg.external_files, false)) {
fmt::print(stderr, "get sub-files from '{}' failed\n", dst_tmp_rdb_dir);
Expand All @@ -444,7 +441,7 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

// Ingest files.
auto iefs = _db->IngestExternalFiles({arg});
auto iefs = db->IngestExternalFiles({arg});
if (!iefs.ok()) {
fmt::print(stderr,
"{}: open rocksdb in {} failed\n",
Expand All @@ -454,15 +451,15 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
}

std::unique_ptr<rocksdb::Iterator> iter(
_db->NewIterator(rocksdb::ReadOptions()));
db->NewIterator(rocksdb::ReadOptions()));
int new_total_count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
new_total_count++;
}
fmt::print(stdout, "new_total_count: {}\n", new_total_count);

// Full compact.
auto crs = _db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr);
auto crs = db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr);
if (!crs.ok()) {
fmt::print(stderr,
"{}: compact rocksdb in {} failed\n",
Expand All @@ -478,27 +475,27 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg
// the manifest file. Should set
// [pegasus.server]get_meta_store_type = "metacf" when start replica server.
auto new_ms = std::make_unique<pegasus::server::meta_store>(
new_rdb_dir.c_str(), _db, _meta_cf);
new_rdb_dir.c_str(), db, meta_cf);
new_ms->set_data_version(pegasus_data_version);
new_ms->set_last_flushed_decree(last_committed_decree);
new_ms->set_last_manual_compact_finish_time(last_manual_compact_finish_time);
// flush and wait for response
rocksdb::FlushOptions options;
options.wait = true;
auto fs = _db->Flush(options, {_meta_cf, _data_cf});
auto fs = db->Flush(options, {meta_cf, data_cf});
if (!fs.ok()) {
fmt::print(
stderr, "{}: flush rocksdb in {} failed\n", fs.ToString(), new_rdb_dir);
return false;
}

// Close rdb.
_db->DestroyColumnFamilyHandle(_data_cf);
_data_cf = nullptr;
_db->DestroyColumnFamilyHandle(_meta_cf);
_meta_cf = nullptr;
delete _db;
_db = nullptr;
db->DestroyColumnFamilyHandle(data_cf);
data_cf = nullptr;
db->DestroyColumnFamilyHandle(meta_cf);
meta_cf = nullptr;
delete db;
db = nullptr;

// Generate new ".app-info".
dsn::app_info new_ai(tsp.ai);
Expand Down

0 comments on commit ad2362a

Please sign in to comment.