diff --git a/src/shell/commands/local_partition_split.cpp b/src/shell/commands/local_partition_split.cpp index 0c0661e81f..32e88ddbdc 100644 --- a/src/shell/commands/local_partition_split.cpp +++ b/src/shell/commands/local_partition_split.cpp @@ -250,32 +250,64 @@ bool split_file(const LocalPartitionSplitContext &lpsc, return true; } +bool open_rocksdb(const rocksdb::DBOptions &db_opts, + const std::string &rdb_dir, + bool read_only, + const std::vector &cf_dscs, + std::vector *cf_hdls, + rocksdb::DB **db) +{ + CHECK_NOTNULL(cf_hdls, ""); + CHECK_NOTNULL(db, ""); + if (read_only) { + RETURN_FALSE_IF_NON_RDB_OK( + rocksdb::DB::OpenForReadOnly(db_opts, rdb_dir, cf_dscs, cf_hdls, db), + "open rocksdb in '{}' failed", + rdb_dir); + } else { + RETURN_FALSE_IF_NON_RDB_OK(rocksdb::DB::Open(db_opts, rdb_dir, cf_dscs, cf_hdls, db), + "open rocksdb in '{}' failed", + rdb_dir); + } + CHECK_EQ(2, cf_hdls->size()); + CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, (*cf_hdls)[0]->GetName()); + CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, (*cf_hdls)[1]->GetName()); + + return true; +} + +void release_db(std::vector *cf_hdls, rocksdb::DB **db) +{ + CHECK_NOTNULL(cf_hdls, ""); + CHECK_NOTNULL(db, ""); + for (auto cf_hdl : *cf_hdls) { + delete cf_hdl; + } + cf_hdls->clear(); + delete *db; + *db = nullptr; +} + bool split_partition(const LocalPartitionSplitContext &lpsc, const ToSplitPatition &tsp, - const std::string &src_replicas_dir, + const std::string &dst_replicas_dir, const std::string &tmp_split_replicas_dir) { static const std::string kRdbDirPostfix("/data/rdb/"); const auto rdb_dir = tsp.replica_dir + kRdbDirPostfix; fmt::print(stdout, " start to split '{}'\n", rdb_dir); - // 1. Open original rocksdb in read-only mode. + // 1. Open the original rocksdb in read-only mode. rocksdb::DBOptions db_opts; // The following options should be set in Pegasus 2.0 and lower versions. // db_opts.pegasus_data = true; // db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX; - std::vector cf_dscs( + const std::vector cf_dscs( {{pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, {}}, {pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, {}}}); std::vector cf_hdls; rocksdb::DB *db = nullptr; - RETURN_FALSE_IF_NON_RDB_OK( - rocksdb::DB::OpenForReadOnly(db_opts, rdb_dir, cf_dscs, &cf_hdls, &db), - "open rocksdb in '{}' failed", - rdb_dir); - CHECK_EQ(2, cf_hdls.size()); - CHECK_EQ(pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME, cf_hdls[0]->GetName()); - CHECK_EQ(pegasus::server::meta_store::META_COLUMN_FAMILY_NAME, cf_hdls[1]->GetName()); + RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, rdb_dir, true, cf_dscs, &cf_hdls, &db), ""); // 2. Get metadata from rocksdb. // - In Pegasus versions lower than 2.0, the metadata is only stored in the MANIFEST @@ -304,15 +336,11 @@ bool split_partition(const LocalPartitionSplitContext &lpsc, db->GetLiveFilesMetaData(&files); // 4. Close rocksdb. - for (auto h : cf_hdls) { - delete h; - } - cf_hdls.clear(); - delete db; - db = nullptr; + release_db(&cf_hdls, &db); // 5. Split the sst files. - auto files_thread_pool = std::unique_ptr(rocksdb::NewThreadPool(8)); + auto files_thread_pool = + std::unique_ptr(rocksdb::NewThreadPool(lpsc.threads_per_partition)); std::atomic total_count = 0; for (const auto &file : files) { // Skip metadata column family files, we will write metadata manually later in @@ -340,11 +368,13 @@ bool split_partition(const LocalPartitionSplitContext &lpsc, // 6. Create new rocksdb instances for the new partitions. // TODO(yingchun): poolize the following operations if necessary. for (int i = 0; i < lpsc.split_count; i++) { - const auto new_data_dir_replica_dir = - construct_split_directory(src_replicas_dir, tsp, lpsc.dst_app_id, i); - const auto new_rdb_dir = new_data_dir_replica_dir + kRdbDirPostfix; + // The new replica is placed on 'dst_replicas_dir'. + const auto new_replica_dir = + construct_split_directory(dst_replicas_dir, tsp, lpsc.dst_app_id, i); + const auto new_rdb_dir = new_replica_dir + kRdbDirPostfix; // i. Create the directory for the split rocksdb. + // TODO(yingchun): make sure it's not exist! RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(new_rdb_dir), "create directory '{}' failed", new_rdb_dir); @@ -354,13 +384,7 @@ bool split_partition(const LocalPartitionSplitContext &lpsc, new_db_opts.create_if_missing = true; // Create the 'pegasus_meta_cf' column family. new_db_opts.create_missing_column_families = true; - RETURN_FALSE_IF_NON_RDB_OK( - rocksdb::DB::Open(new_db_opts, new_rdb_dir, cf_dscs, &cf_hdls, &db), - "open rocksdb in '{}' failed", - new_rdb_dir); - CHECK_EQ(2, cf_hdls.size()); - CHECK_EQ(cf_hdls[0]->GetName(), pegasus::server::meta_store::DATA_COLUMN_FAMILY_NAME); - CHECK_EQ(cf_hdls[1]->GetName(), pegasus::server::meta_store::META_COLUMN_FAMILY_NAME); + RETURN_FALSE_IF_NOT(open_rocksdb(db_opts, new_rdb_dir, false, cf_dscs, &cf_hdls, &db), ""); // iii. Ingest the split sst files to the new rocksdb. do { @@ -432,12 +456,7 @@ bool split_partition(const LocalPartitionSplitContext &lpsc, db->Flush(options, cf_hdls), "flush rocksdb in '{}' failed", new_rdb_dir); // v. Close rocksdb. - for (auto h : cf_hdls) { - delete h; - } - cf_hdls.clear(); - delete db; - db = nullptr; + release_db(&cf_hdls, &db); // vi. Generate new ".app-info". dsn::app_info new_ai(tsp.ai); @@ -449,15 +468,15 @@ bool split_partition(const LocalPartitionSplitContext &lpsc, new_ai.init_partition_count = -1; dsn::replication::replica_app_info rai(&new_ai); const auto rai_path = dsn::utils::filesystem::path_combine( - new_data_dir_replica_dir, dsn::replication::replica_app_info::kAppInfo); + new_replica_dir, dsn::replication::replica_app_info::kAppInfo); RETURN_FALSE_IF_NON_OK(rai.store(rai_path), "write replica_app_info '{}' failed", rai_path); // vii. Generate new ".init-info". dsn::replication::replica_init_info new_rii(tsp.rii); new_rii.init_offset_in_shared_log = 0; new_rii.init_offset_in_private_log = 0; - const auto rii_path = dsn::utils::filesystem::path_combine(new_data_dir_replica_dir, - replica_init_info::kInitInfo); + const auto rii_path = + dsn::utils::filesystem::path_combine(new_replica_dir, replica_init_info::kInitInfo); RETURN_FALSE_IF_NON_OK(dsn::utils::dump_rjobj_to_file(new_rii, rii_path), "write replica_init_info '{}' failed", rii_path); @@ -466,19 +485,23 @@ bool split_partition(const LocalPartitionSplitContext &lpsc, return true; } -bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::string &src_data_dir) +bool split_data_directory(const LocalPartitionSplitContext &lpsc, + const std::string &src_data_dir, + const std::string &dst_data_dir) { - // 1. Find all replica directories in the current data directory. + static const std::string kReplicasDirPostfix("replica/reps"); + + // 1. Collect all replica directories from 'src_data_dir'. const auto src_replicas_dir = - dsn::utils::filesystem::path_combine(src_data_dir, "replica/reps"); + dsn::utils::filesystem::path_combine(src_data_dir, kReplicasDirPostfix); std::vector replica_dirs; RETURN_FALSE_IF_NOT( dsn::utils::filesystem::get_subdirectories(src_replicas_dir, replica_dirs, false), "invalid command, get sub-directories from '{}' failed", src_replicas_dir); - // 2. Create temporary split directory. - const auto tmp_split_replicas_dir = dsn::utils::filesystem::path_combine(src_data_dir, "split"); + // 2. Create temporary split directory on 'dst_data_dir'. + const auto tmp_split_replicas_dir = dsn::utils::filesystem::path_combine(dst_data_dir, "split"); RETURN_FALSE_IF_NOT(dsn::utils::filesystem::create_directory(tmp_split_replicas_dir), "create split directory '{}' failed", tmp_split_replicas_dir); @@ -488,7 +511,7 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str std::set exist_app_ids; const std::set ordered_replica_dirs(replica_dirs.begin(), replica_dirs.end()); for (const auto &replica_dir : ordered_replica_dirs) { - // Validate the replica directory. + // i. Validate the replica directory. dsn::app_info ai; dsn::gpid pid; std::string hint_message; @@ -497,13 +520,14 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str continue; } - // Skip the non-. + // ii. Skip the non-. CHECK_EQ(pid.get_app_id(), ai.app_id); if (ai.app_id != lpsc.src_app_id) { continue; } - // Skip and warning for the replica with the same app id but not desired partition index. + // iii. Skip and warning for the replica with the same app id but not desired partition + // index. const auto cur_pidx = pid.get_partition_index(); if (lpsc.src_partition_ids.count(cur_pidx) == 0) { fmt::print(stdout, @@ -513,7 +537,7 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str continue; } - // Continue and warning if the exist. + // iv. Continue and warning if the exist. exist_app_ids.insert(ai.app_id); if (exist_app_ids.count(lpsc.dst_app_id) != 0) { fmt::print( @@ -523,22 +547,23 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str lpsc.dst_app_id); } - // Check if matches. + // v. Check if matches. RETURN_FALSE_IF_NOT(ai.partition_count == lpsc.src_partition_count, "unmatched ({} vs {})", ai.partition_count, lpsc.src_partition_count); - // Check the app status. + // vi. Check the app status. RETURN_FALSE_IF_NOT(ai.status == dsn::app_status::AS_AVAILABLE, "not support to split app '{}' in non-AVAILABLE status", ai.app_name); - // Check if the app is duplicating or bulk loading. + // vii. Check if the app is duplicating or bulk loading. RETURN_FALSE_IF_NOT(!ai.duplicating && !ai.is_bulk_loading, "not support to split app '{}' which is duplicating or bulk loading", ai.app_name); + // viii. Load the replica_init_info. dsn::replication::replica_init_info rii; const auto rii_path = dsn::utils::filesystem::path_combine(replica_dir, replica_init_info::kInitInfo); @@ -546,15 +571,18 @@ bool split_data_directory(const LocalPartitionSplitContext &lpsc, const std::str "load replica_init_info from '{}' failed", rii_path); + // ix. Gather the replica. to_split_partitions.push_back({replica_dir, ai, rii, pid.get_partition_index()}); } // 4. Split the partitions. - // TODO(yingchun): make it optional. - auto partitions_thread_pool = std::unique_ptr(rocksdb::NewThreadPool(8)); + const auto dst_replicas_dir = + dsn::utils::filesystem::path_combine(src_data_dir, kReplicasDirPostfix); + auto partitions_thread_pool = + std::unique_ptr(rocksdb::NewThreadPool(lpsc.threads_per_data_dir)); for (const auto &tsp : to_split_partitions) { partitions_thread_pool->SubmitJob([=]() { - if (!split_partition(lpsc, tsp, src_replicas_dir, tmp_split_replicas_dir)) { + if (!split_partition(lpsc, tsp, dst_replicas_dir, tmp_split_replicas_dir)) { fmt::print(stderr, " split partition '{}' failed\n", tsp.replica_dir); } else { fmt::print(stdout, " split partition '{}' succeed\n", tsp.replica_dir); @@ -596,9 +624,12 @@ bool local_partition_split(command_executor *e, shell_context *sc, arguments arg // 3. Split each data directory. auto data_dirs_thread_pool = std::unique_ptr( rocksdb::NewThreadPool(static_cast(lpsc.src_data_dirs.size()))); - for (const auto &src_data_dir : lpsc.src_data_dirs) { + CHECK_EQ(lpsc.src_data_dirs.size(), lpsc.dst_data_dirs.size()); + for (auto i = 0; i < lpsc.src_data_dirs.size(); i++) { + const auto &src_data_dir = lpsc.src_data_dirs[i]; + const auto &dst_data_dir = lpsc.dst_data_dirs[i]; data_dirs_thread_pool->SubmitJob([=]() { - if (!split_data_directory(lpsc, src_data_dir)) { + if (!split_data_directory(lpsc, src_data_dir, dst_data_dir)) { fmt::print(stderr, "split data directory '{}' failed\n", src_data_dir); } else { fmt::print(stdout, "split data directory '{}' succeed\n", src_data_dir);