From 9d0f5865f51b006999aa77ff50e623a1716e7339 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Fri, 1 Mar 2024 00:03:16 +0800 Subject: [PATCH] feat(local_partition_split): init --- src/{server => base}/meta_store.cpp | 14 +- src/{server => base}/meta_store.h | 15 +- src/client/partition_resolver.cpp | 5 + src/client/partition_resolver.h | 20 +- src/client/partition_resolver_simple.cpp | 4 - src/client/partition_resolver_simple.h | 2 - src/server/pegasus_server_impl.cpp | 4 +- src/server/pegasus_server_impl_init.cpp | 3 + src/server/pegasus_write_service_impl.h | 1 + src/server/test/CMakeLists.txt | 2 +- src/shell/commands.h | 5 + src/shell/commands/local_partition_split.cpp | 495 +++++++++++++++++++ src/shell/main.cpp | 8 + 13 files changed, 553 insertions(+), 25 deletions(-) rename src/{server => base}/meta_store.cpp (94%) rename src/{server => base}/meta_store.h (91%) create mode 100644 src/shell/commands/local_partition_split.cpp diff --git a/src/server/meta_store.cpp b/src/base/meta_store.cpp similarity index 94% rename from src/server/meta_store.cpp rename to src/base/meta_store.cpp index db0b9a0a47..3c84793ebc 100644 --- a/src/server/meta_store.cpp +++ b/src/base/meta_store.cpp @@ -22,14 +22,22 @@ #include #include +#include "base/pegasus_const.h" #include "common/replica_envs.h" #include "server/pegasus_server_impl.h" #include "utils/fmt_logging.h" #include "utils/string_conv.h" +DSN_DEFINE_string("pegasus.server", + get_meta_store_type, + "manifest", + "Where to get meta data, now support 'manifest' and 'metacf'"); +DSN_DEFINE_validator(get_meta_store_type, [](const char *type) { + return strcmp(type, "manifest") == 0 || strcmp(type, "metacf") == 0; +}); + namespace pegasus { namespace server { - const std::string meta_store::DATA_VERSION = "pegasus_data_version"; const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree"; const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME = @@ -38,10 +46,10 @@ const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal"; const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write"; const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load"; -meta_store::meta_store(pegasus_server_impl *server, +meta_store::meta_store(const char *log_prefix, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf) - : replica_base(server), _db(db), _meta_cf(meta_cf) + : _name(log_prefix), _db(db), _meta_cf(meta_cf) { // disable write ahead logging as replication handles logging instead now _wt_opts.disableWAL = true; diff --git a/src/server/meta_store.h b/src/base/meta_store.h similarity index 91% rename from src/server/meta_store.h rename to src/base/meta_store.h index d8744a1ccd..bccfde70de 100644 --- a/src/server/meta_store.h +++ b/src/base/meta_store.h @@ -41,10 +41,17 @@ class pegasus_server_impl; // - pegasus_data_version // - pegasus_last_flushed_decree // - pegasus_last_manual_compact_finish_time -class meta_store : public dsn::replication::replica_base +class meta_store { public: - meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf); + enum class meta_store_type + { + kManifestOnly = 0, + kMetaCFOnly, + kBothManifestAndMetaCF, + }; + + meta_store(const char *log_prefix, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf); dsn::error_code get_last_flushed_decree(uint64_t *decree) const; uint64_t get_decree_from_readonly_db(rocksdb::DB *db, @@ -85,6 +92,9 @@ class meta_store : public dsn::replication::replica_base FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options); FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs); + // tricky: the 'replica_name()' is used for logging. + const char *replica_name() const { return _name.c_str(); } + // Keys of meta data wrote into meta column family. static const std::string DATA_VERSION; static const std::string LAST_FLUSHED_DECREE; @@ -93,6 +103,7 @@ class meta_store : public dsn::replication::replica_base static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE; static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; + const std::string _name; rocksdb::DB *_db; rocksdb::ColumnFamilyHandle *_meta_cf; rocksdb::WriteOptions _wt_opts; diff --git a/src/client/partition_resolver.cpp b/src/client/partition_resolver.cpp index 1b8baa7a18..993e487741 100644 --- a/src/client/partition_resolver.cpp +++ b/src/client/partition_resolver.cpp @@ -45,6 +45,11 @@ partition_resolver_ptr partition_resolver::get_resolver(const char *cluster_name return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_name); } +int partition_resolver::get_partition_index(int partition_count, uint64_t partition_hash) +{ + return partition_hash % static_cast(partition_count); +} + DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) static inline bool error_retry(error_code err) diff --git a/src/client/partition_resolver.h b/src/client/partition_resolver.h index b870b2c7a2..b47671c94e 100644 --- a/src/client/partition_resolver.h +++ b/src/client/partition_resolver.h @@ -55,6 +55,15 @@ class partition_resolver : public ref_counter get_resolver(const char *cluster_name, const std::vector &meta_list, const char *app_name); + /** + * get zero-based partition index + * + * \param partition_count number of partitions. + * \param partition_hash the partition hash. + * + * \return zero-based partition index. + */ + static int get_partition_index(int partition_count, uint64_t partition_hash); template dsn::rpc_response_task_ptr call_op(dsn::task_code code, @@ -131,17 +140,6 @@ class partition_resolver : public ref_counter */ virtual void on_access_failure(int partition_index, error_code err) = 0; - /** - * get zero-based partition index - * - * \param partition_count number of partitions. - * \param partition_hash the partition hash. - * - * \return zero-based partition index. - */ - - virtual int get_partition_index(int partition_count, uint64_t partition_hash) = 0; - std::string _cluster_name; std::string _app_name; rpc_address _meta_server; diff --git a/src/client/partition_resolver_simple.cpp b/src/client/partition_resolver_simple.cpp index a0fd30224d..ed2ef12489 100644 --- a/src/client/partition_resolver_simple.cpp +++ b/src/client/partition_resolver_simple.cpp @@ -448,9 +448,5 @@ error_code partition_resolver_simple::get_address(int partition_index, /*out*/ r } } -int partition_resolver_simple::get_partition_index(int partition_count, uint64_t partition_hash) -{ - return partition_hash % static_cast(partition_count); -} } // namespace replication } // namespace dsn diff --git a/src/client/partition_resolver_simple.h b/src/client/partition_resolver_simple.h index 2ab1fb70c2..2cfb97d832 100644 --- a/src/client/partition_resolver_simple.h +++ b/src/client/partition_resolver_simple.h @@ -59,8 +59,6 @@ class partition_resolver_simple : public partition_resolver virtual void on_access_failure(int partition_index, error_code err) override; - virtual int get_partition_index(int partition_count, uint64_t partition_hash) override; - int get_partition_count() const { return _app_partition_count; } private: diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 5c62455cca..1b5c91d495 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -46,6 +46,7 @@ #include "absl/strings/string_view.h" #include "base/idl_utils.h" // IWYU pragma: keep +#include "base/meta_store.h" #include "base/pegasus_key_schema.h" #include "base/pegasus_utils.h" #include "base/pegasus_value_schema.h" @@ -56,7 +57,6 @@ #include "consensus_types.h" #include "dsn.layer2_types.h" #include "hotkey_collector.h" -#include "meta_store.h" #include "pegasus_rpc_types.h" #include "pegasus_server_write.h" #include "replica_admin_types.h" @@ -1736,7 +1736,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) _meta_cf = handles_opened[1]; // Create _meta_store which provide Pegasus meta data read and write. - _meta_store = std::make_unique(this, _db, _meta_cf); + _meta_store = dsn::make_unique(replica_name(), _db, _meta_cf); if (db_exist) { auto cleanup = dsn::defer([this]() { release_db(); }); diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index 4d2e3f1aa6..5b5670fcde 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -37,8 +37,11 @@ #include #include +#include "base/meta_store.h" #include "common/gpid.h" #include "hashkey_transform.h" +#include "pegasus_event_listener.h" +#include "pegasus_server_write.h" #include "hotkey_collector.h" #include "pegasus_event_listener.h" #include "pegasus_server_impl.h" diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 3ac6bfe952..e0dfcbbe1e 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -22,6 +22,7 @@ #include #include "base/idl_utils.h" +#include "base/meta_store.h" #include "base/pegasus_key_schema.h" #include "logging_utils.h" #include "meta_store.h" diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 5907476d30..082a98c1e4 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -26,11 +26,11 @@ set(MY_PROJ_SRC "../capacity_unit_calculator.cpp" "../pegasus_mutation_duplicator.cpp" "../hotspot_partition_calculator.cpp" - "../meta_store.cpp" "../hotkey_collector.cpp" "../rocksdb_wrapper.cpp" "../compaction_filter_rule.cpp" "../compaction_operation.cpp") + set(MY_SRC_SEARCH_MODE "GLOB") set(MY_PROJ_LIBS dsn_replica_server diff --git a/src/shell/commands.h b/src/shell/commands.h index a5faa1b713..78541213fe 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -284,3 +284,8 @@ bool clear_bulk_load(command_executor *e, shell_context *sc, arguments args); // == detect hotkey (see 'commands/detect_hotkey.cpp') == // bool detect_hotkey(command_executor *e, shell_context *sc, arguments args); + +// == local partition split (see 'commands/local_partition_split.cpp') == // +extern const std::string local_partition_split_help; +bool local_partition_split(command_executor *e, shell_context *sc, arguments args); + diff --git a/src/shell/commands/local_partition_split.cpp b/src/shell/commands/local_partition_split.cpp new file mode 100644 index 0000000000..67361f2a03 --- /dev/null +++ b/src/shell/commands/local_partition_split.cpp @@ -0,0 +1,495 @@ +// Copyright (c) 2019, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "shell/commands.h" +#include "shell/argh.h" +#include "base/idl_utils.h" +#include "rocksdb/sst_file_reader.h" +#include "rocksdb/sst_file_writer.h" +#include +#include +#include +#include +#include +#include +#include "base/meta_store.h" +#include "base/pegasus_key_schema.h" + +DSN_DECLARE_string(get_meta_store_type); + +// TODO(yingchun): improve the parameters, use flags (i.e. --src_data_dirs=xxx) would be more friendly. +const std::string local_partition_split_help = " "; +bool local_partition_split(command_executor *e, shell_context *sc, arguments args) +{ + // Parse parameters. + argh::parser cmd(args.argc, args.argv); + if (cmd.pos_args().size() != 8) { + fmt::print(stderr, "invalid command, should be in the form of {}\n", local_partition_split_help); + return false; + } + int index = 1; + +#define PARSE_STRS(container) \ + do { \ + ::dsn::utils::split_args(cmd(index++).str().c_str(), container, ','); \ + if (container.empty()) { \ + fmt::print(stderr, "invalid command, <" #container "> should be in the form of 'val1,val2,val3' and should not be empty\n"); \ + return false; \ + } \ + std::set str_set(container.begin(), container.end()); \ + if (str_set.size() != container.size()) { \ + fmt::print(stderr, "invalid command, <" #container "> has duplicate values\n"); \ + return false; \ + } \ + } while (false) + +#define PARSE_UINT(value) \ + do { \ + if (!::dsn::buf2uint32(cmd(index++).str(), value)) { \ + fmt::print(stderr, "invalid command, <" #value "> should be an unsigned integer\n"); \ + return false; \ + } \ + } while (false) + +#define PARSE_UINTS(container) \ + do { \ + std::vector strs; \ + PARSE_STRS(strs); \ + container.clear(); \ + for (const auto &str : strs) { \ + uint32_t v; \ + if (!::dsn::buf2uint32(str, v)) { \ + fmt::print(stderr, "invalid command, '{}' in <" #container "> should be an unsigned integer\n", str); \ + return false; \ + } \ + container.push_back(v); \ + } \ + } while (false) + + std::vector src_data_dirs; + PARSE_STRS(src_data_dirs); + + std::vector dst_data_dirs; + PARSE_STRS(dst_data_dirs); + + if (src_data_dirs.size() != dst_data_dirs.size()) { + fmt::print(stderr, "invalid command, the list size of and must be equal\n"); + return false; + } + + uint32_t src_app_id; + PARSE_UINT(src_app_id); + + uint32_t dst_app_id; + PARSE_UINT(dst_app_id); + + std::vector partition_ids; + PARSE_UINTS(partition_ids); + std::set src_partition_ids(partition_ids.begin(), partition_ids.end()); + + uint32_t src_partition_count; + PARSE_UINT(src_partition_count); + + uint32_t dst_partition_count; + PARSE_UINT(dst_partition_count); + +#undef PARSE_STRS +#undef PARSE_UINT +#undef PARSE_UINTS + + // Check parameters. + // TODO(yingchun): 1. check disk space. + // 2. check app id + // TODO(yingchun): check 'dst_app_id' is not exist. + if (src_app_id == dst_app_id) { + fmt::print(stderr, "invalid command, and should be equal ({} vs. {})\n", src_app_id, dst_app_id); + return false; + } + // 3. check 'partition_ids'. + for (const auto partition_id : partition_ids) { + if (partition_id >= src_partition_count) { + fmt::print(stderr, "invalid command, should be in range [0, {})\n", src_partition_count); + return false; + } + } + + if (dst_partition_count <= src_partition_count) { + fmt::print(stderr, "invalid command, should be larger than ({} vs. {})\n", dst_partition_count, src_partition_count); + return false; + } + + // TODO(yingchun): check the correction. + int split_count = dst_partition_count / src_partition_count; + dcheck_gt(split_count, 0); + int log2n = log2(split_count); + if (pow(2, log2n) != split_count) { + fmt::print(stderr, "invalid command, should be 2^n times of ({} vs. {})\n", dst_partition_count, src_partition_count); + return false; + } + + static const std::string data_dir_postfix("/replica/reps"); + static const std::string split_dir_postfix("/split"); + for (const auto &src_data_dir : src_data_dirs) { + std::vector replica_dirs; + const auto data_dir_replica_dirs = src_data_dir + data_dir_postfix; + const auto split_data_dir_replica_dirs = src_data_dir + split_dir_postfix; + if (!dsn::utils::filesystem::get_subdirectories(data_dir_replica_dirs, replica_dirs, false)) { + fmt::print(stderr, "invalid command, get sub-directories from '{}' failed\n", data_dir_replica_dirs); + continue; + } + + // Create split directory. + if (!dsn::utils::filesystem::create_directory(split_data_dir_replica_dirs)) { + fmt::print(stderr, "create split directory {} failed\n", split_data_dir_replica_dirs); + continue; + } + + // Gather replicas to split. + struct tsp { + std::string replica_dir; + dsn::app_info ai; + int32_t pidx; + }; + std::vector to_split_partitions; + std::set exist_app_ids; + // "1.0.pegasus" + static const std::string kAppInfo = ".app-info"; + for (const auto &replica_dir : replica_dirs) { + auto fname = dsn::utils::filesystem::get_file_name(replica_dir); + // TODO(yingchun): duplicate with code in replica_stub.cpp, refactor it! + if (fname.length() >= 4 && + (fname.substr(fname.length() - 4) == ".err" || fname.substr(fname.length() - 4) == ".gar" || + fname.substr(fname.length() - 4) == ".bak")) { + fmt::print(stdout, "ignore fname {}\n", fname); + continue; + } + + // TODO(yingchun): duplicate with code in replica_init.cpp, refactor it! + char splitters[] = {'\\', '/', 0}; + std::string name = dsn::utils::get_last_component(replica_dir, splitters); + if (name.empty()) { + fmt::print(stderr, "invalid replica dir {}\n", replica_dir); + continue; + } + + char app_type[128]; + int32_t app_id, pidx; + if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) { + fmt::print(stderr, "invalid replica dir {}\n", replica_dir); + continue; + } + + // Check parameters. + if (app_id != src_app_id) { + continue; + } + + exist_app_ids.insert(app_id); + if (exist_app_ids.count(dst_app_id) > 0) { + fmt::print(stderr, "invalid command, {} is already exist\n", dst_app_id); + return false; + } + + dsn::gpid pid(app_id, pidx); + if (!dsn::utils::filesystem::directory_exists(replica_dir)) { + fmt::print(stderr, "invalid replica dir {}\n", replica_dir); + continue; + } + + dsn::app_info info; + dsn::replication::replica_app_info info2(&info); + std::string path = dsn::utils::filesystem::path_combine(replica_dir, kAppInfo); + auto err = info2.load(path); + if (dsn::ERR_OK != err) { + fmt::print(stderr, "load app-info from {} failed, err = {}\n", path, err); + continue; + } + + if (info.partition_count != src_partition_count) { + fmt::print(stderr, "unmatched partition count ({} vs {})\n", info.partition_count, src_partition_count); + return false; + + } + + if (info.app_type != app_type) { + fmt::print(stderr, "unmatched app type {} for {}\n", info.app_type, path); + continue; + } + + // All checks passed, do the split. + if (src_partition_ids.count(pidx) > 0) { + to_split_partitions.push_back({replica_dir, info, pidx}); + } + } + + // Split the partition. + static const std::string rdb_dir_postfix("/data/rdb/"); + for (const auto &tsp : to_split_partitions) { + const auto &replica_dir = tsp.replica_dir; + const auto rdb_dir = replica_dir + rdb_dir_postfix; + fmt::print(stdout, "start to split '{}'\n", rdb_dir); + + // Open rdb. + rocksdb::DBOptions _db_opts; +// _db_opts.pegasus_data = true; +// _db_opts.pegasus_data_version = pegasus::PEGASUS_DATA_VERSION_MAX; + _db_opts.create_if_missing = false; + + // TODO(yingchun): how to open in read-only mode? + // TODO(yingchun): can use the variables in pegasus_server_impl.h, because can't link it. + const std::string DATA_COLUMN_FAMILY_NAME = "default"; + const std::string META_COLUMN_FAMILY_NAME = "pegasus_meta_cf"; + rocksdb::ColumnFamilyOptions _data_cf_opts; + rocksdb::ColumnFamilyOptions _meta_cf_opts; + rocksdb::ColumnFamilyHandle *_data_cf; + rocksdb::ColumnFamilyHandle *_meta_cf; + std::vector column_families( + {{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}, + {META_COLUMN_FAMILY_NAME, _meta_cf_opts}}); + std::vector handles_opened; + rocksdb::DB *_db; + auto s = rocksdb::DB::Open(_db_opts, rdb_dir, column_families, &handles_opened, &_db); + if (!s.ok()) { + fmt::print(stderr, "{}: open rocksdb in {} failed\n", s.ToString(), rdb_dir); + continue; + } + dassert_f(2 == handles_opened.size(), ""); + dassert_f(handles_opened[0]->GetName() == DATA_COLUMN_FAMILY_NAME, ""); + dassert_f(handles_opened[1]->GetName() == META_COLUMN_FAMILY_NAME, ""); + _data_cf = handles_opened[0]; + _meta_cf = handles_opened[1]; + + // Get metadata in rocksdb. + FLAGS_get_meta_store_type = "manifest"; + auto ms = dsn::make_unique(rdb_dir.c_str(), _db, _meta_cf); + uint64_t last_committed_decree; + auto err = ms->get_last_flushed_decree(&last_committed_decree); + if (err != dsn::ERR_OK) { + fmt::print(stderr, "{}: get_last_flushed_decree from '{}' failed\n", err, rdb_dir); + return false; + } + + uint32_t pegasus_data_version; + err = ms->get_data_version(&pegasus_data_version); + if (err != dsn::ERR_OK) { + fmt::print(stderr, "{}: get_data_version from '{}' failed\n", err, rdb_dir); + return false; + } + + uint64_t last_manual_compact_finish_time; + err = ms->get_last_manual_compact_finish_time(&last_manual_compact_finish_time); + if (err != dsn::ERR_OK) { + fmt::print(stderr, "{}: get_last_manual_compact_finish_time from '{}' failed\n", err, rdb_dir); + return false; + } + + // Get all sst files. + std::vector files; + _db->GetLiveFilesMetaData(&files); + + // Close rdb. + _db->DestroyColumnFamilyHandle(_data_cf); + _data_cf = nullptr; + _db->DestroyColumnFamilyHandle(_meta_cf); + _meta_cf = nullptr; + delete _db; + _db = nullptr; + + for (const auto &file : files) { + // 'fname' has a duplicate '/' when contact 'file.db_path' and 'file.name', it doesn't matter. + const auto fname = file.db_path + file.name; + fmt::print(stdout, "column family: {}, file: {}\n", file.column_family_name, fname); + // Open reader. + // TODO(yingchun): options? + auto reader = dsn::make_unique(rocksdb::Options()); + auto rs = reader->Open(fname); + if (!rs.ok()) { + fmt::print(stderr, "{}: open reader file {} failed\n", rs.ToString(), fname); + return false; + } + rs = reader->VerifyChecksum(); + if (!rs.ok()) { + fmt::print(stderr, "{}: verify reader file {} failed\n", rs.ToString(), fname); + return false; + } + + const auto tbl_ppts = reader->GetTableProperties(); + if (tbl_ppts->column_family_name == META_COLUMN_FAMILY_NAME) { + // Skip meta column family files, we will write metadata manually. + fmt::print(stdout, "skipped\n"); + continue; + } + + // Open writers. + std::shared_ptr writers[split_count]; + for (int i = 0; i < split_count; i++) { + // Split temporary dir. + auto dst_tmp_rdb_dir = fmt::format("{}/{}.{}.pegasus", split_data_dir_replica_dirs, dst_app_id, + tsp.pidx + i * tsp.ai.partition_count); + + // Create dst rdb directory. + if (!dsn::utils::filesystem::create_directory(dst_tmp_rdb_dir)) { + fmt::print(stderr, "create directory '{}' failed\n", dst_tmp_rdb_dir); + return false; + } + + // TODO(yingchun): options? + auto dst_tmp_rdb_file = fmt::format("{}{}", dst_tmp_rdb_dir, file.name); + writers[i] = std::make_shared(rocksdb::EnvOptions(), rocksdb::Options()); + auto ws = writers[i]->Open(dst_tmp_rdb_file); + if (!ws.ok()) { + fmt::print(stderr, "{}: open writer file '{}' failed\n", ws.ToString(), dst_tmp_rdb_file); + return false; + } + fmt::print(stdout, "open writer file '{}'\n", dst_tmp_rdb_file); + } + + // TODO(yingchun): 如果是一个 delete 操作,应该迭代不出来(?),那如果 level 底层(旧数据)有一个 put,level 高层(新数据)有一个 delete, + // 如果 put 可以迭代出来而 delete 迭代不出来的话,则可能出现已删除数据重现的问题? + std::unique_ptr iter(reader->NewIterator(rocksdb::ReadOptions())); + iter->SeekToFirst(); + while (iter->Valid()) { + // Calc the hash value and corresponding new partition index and sst writer. + const auto &skey = iter->key(); + dsn::blob bb_key(skey.data(), 0, skey.size()); + uint64_t hash_value = pegasus::pegasus_key_hash(bb_key); + int new_pidx = dsn::replication::partition_resolver::get_partition_index(dst_partition_count, hash_value); + dcheck_le(0, new_pidx); + dcheck_lt(new_pidx, dst_partition_count); + int writer_idx = new_pidx / dst_partition_count; + dcheck_le(0, writer_idx); + dcheck_lt(writer_idx, split_count); + + // TODO(yingchun): improve to check expired data. + + // Write the data to the new partition sst file. + auto ws = writers[writer_idx]->Put(skey, iter->value()); + if (!ws.ok()) { + fmt::print(stderr, "{}: from {} write failed\n", ws.ToString(), fname); + return false; + } + iter->Next(); + } + + // Release reader and writers. + for (int i = 0; i < split_count; i++) { + auto ws = writers[i]->Finish(nullptr); + if (!ws.ok()) { + fmt::print(stderr, "{}: finish writer split from file {} failed\n", ws.ToString(), fname); + return false; + } + } + reader.release(); + } + + // Create new partitions. + for (int i = 0; i < split_count; i++) { + const auto new_data_dir_replica_dirs = fmt::format("{}/{}.{}.pegasus", data_dir_replica_dirs, dst_app_id, tsp.pidx + i * tsp.ai.partition_count); + // Create the new directory. + // TODO(yingchun): mkdir -p is not supported ? + if (!dsn::utils::filesystem::create_directory(new_data_dir_replica_dirs)) { + fmt::print(stderr, "create directory {} failed\n", new_data_dir_replica_dirs); + return false; + } + const auto new_rdb_dir = new_data_dir_replica_dirs + rdb_dir_postfix; + if (!dsn::utils::filesystem::create_directory(new_rdb_dir)) { + fmt::print(stderr, "create directory {} failed\n", new_rdb_dir); + return false; + } + + // Open rdb. + _db_opts.create_if_missing = true; + _db_opts.create_missing_column_families = true; // Create the 'pegasus_meta_cf' column family. + auto s = rocksdb::DB::Open(_db_opts, new_rdb_dir, column_families, &handles_opened, &_db); + if (!s.ok()) { + fmt::print(stderr, "{}: open rocksdb in {} failed\n", s.ToString(), new_rdb_dir); + return false; + } + dassert_f(2 == handles_opened.size(), ""); + dassert_f(handles_opened[0]->GetName() == DATA_COLUMN_FAMILY_NAME, ""); + dassert_f(handles_opened[1]->GetName() == META_COLUMN_FAMILY_NAME, ""); + _data_cf = handles_opened[0]; + _meta_cf = handles_opened[1]; + + const auto dst_tmp_rdb_dir = fmt::format("{}/{}.{}.pegasus", split_data_dir_replica_dirs, + dst_app_id, + tsp.pidx + i * tsp.ai.partition_count); + if (dsn::utils::filesystem::directory_exists(dst_tmp_rdb_dir)) { + // Gather all files. + rocksdb::IngestExternalFileArg arg; + if (!dsn::utils::filesystem::get_subdirectories(dst_tmp_rdb_dir, arg.external_files, false)) { + fmt::print(stderr, "get sub-directories from '{}' failed\n", dst_tmp_rdb_dir); + return false; + } + + // Ingest files. + // TODO(yingchun): any problem if they are in 2 CFs ? + auto iefs = _db->IngestExternalFiles({arg}); + if (!iefs.ok()) { + fmt::print(stderr, "{}: open rocksdb in {} failed\n", iefs.ToString(), new_rdb_dir); + return false; + } + + // Full compact. + auto crs = _db->CompactRange(rocksdb::CompactRangeOptions(), nullptr, nullptr); + if (!crs.ok()) { + fmt::print(stderr, "{}: compact rocksdb in {} failed\n", crs.ToString(), new_rdb_dir); + return false; + } + } else { + fmt::print(stdout, "'{}' is empty\n", dst_tmp_rdb_dir); + } + + // TODO(yingchun): these metadata are only written to the meta column family, not the manifest file. Should set + // [pegasus.server]get_meta_store_type = "metacf" when start replica server. + auto new_ms = dsn::make_unique(new_rdb_dir.c_str(), _db, _meta_cf); + new_ms->set_data_version(pegasus_data_version); + new_ms->set_last_flushed_decree(last_committed_decree); + new_ms->set_last_manual_compact_finish_time(last_manual_compact_finish_time); + // flush and wait for response + rocksdb::FlushOptions options; + options.wait = true; + auto fs = _db->Flush(options, {_meta_cf, _data_cf}); + if (!fs.ok()) { + fmt::print(stderr, "{}: flush rocksdb in {} failed\n", fs.ToString(), new_rdb_dir); + return false; + } + + // Close rdb. + _db->DestroyColumnFamilyHandle(_data_cf); + _data_cf = nullptr; + _db->DestroyColumnFamilyHandle(_meta_cf); + _meta_cf = nullptr; + delete _db; + _db = nullptr; + + // Generate new ".app-info". + dsn::app_info new_ai(tsp.ai); + new_ai.app_name += "_new"; // TODO(yingchun): customize it. + new_ai.app_id = dst_app_id; + new_ai.partition_count = dst_partition_count; + new_ai.create_second = dsn_now_s(); + dsn::replication::replica_app_info rai(&new_ai); + const auto rai_path = dsn::utils::filesystem::path_combine(new_data_dir_replica_dirs, kAppInfo); + auto err = rai.store(rai_path.c_str()); + if (err != dsn::ERR_OK) { + fmt::print(stderr, "{}: write {} failed\n", err, rai_path); + return false; + } + + // Generate new ".init-info". + // TODO(yingchun): check the correctness. + dsn::replication::replica_init_info rii; + rii.init_durable_decree = last_committed_decree; + err = rii.store(new_data_dir_replica_dirs.c_str()); + if (err != dsn::ERR_OK) { + fmt::print(stderr, "{}: write {} failed\n", err, new_data_dir_replica_dirs); + return false; + } + } + } + } + + return true; +} diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 66ab8da4f3..ef5313a3d1 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -29,8 +29,10 @@ #include #include #include +#include #include "args.h" +#include "base/pegasus_const.h" #include "client/replication_ddl_client.h" #include "command_executor.h" #include "commands.h" @@ -532,6 +534,12 @@ static command_executor commands[] = { " ", set_max_replica_count, }, + { + "local_partition_split", + "split the local partitions", + fmt::format("{}", local_partition_split_help).c_str(), + local_partition_split, + }, { "exit", "exit shell", "", exit_shell, },