From 7e95f31cc15e11574a42803dbf73b25599fae00b Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 21 Jun 2024 15:42:43 +0800 Subject: [PATCH] feat(remote_command): provide the query for the progress of decrees including both local writes and duplications (#2045) There are many kinds of decrees while writing locally and duplicating to remote clusters, for example, the max decree in prepare list, the last decree that has ever been committed, the last decree that has been applied into rocksdb memtable, the last decree that has been flushed into rocksdb sst files, the max decree that has been confirmed by remote cluster for duplication, etc.. These decrees are very useful while we want to watch the progress of all the local writes and duplications. These decrees might also help us diagnose the problems. Therefore, we provide a tool in the way of `remote_command` to show the decrees for each replica. --- src/common/json_helper.h | 6 ++ src/replica/duplication/replica_duplicator.h | 20 +++++ .../replica_duplicator_manager.cpp | 75 +++++++++++++---- .../duplication/replica_duplicator_manager.h | 48 +++++------ src/replica/replica.cpp | 6 +- src/replica/replica.h | 39 +++++++-- src/replica/replica_stub.cpp | 20 ++++- src/replica/replication_app_base.h | 6 ++ .../storage/simple_kv/simple_kv.server.impl.h | 4 +- .../simple_kv/test/simple_kv.server.impl.h | 4 +- src/replica/test/mock_utils.h | 2 + src/server/pegasus_server_impl.cpp | 83 +++++++++++-------- src/server/pegasus_server_impl.h | 2 + 13 files changed, 225 insertions(+), 90 deletions(-) diff --git a/src/common/json_helper.h b/src/common/json_helper.h index 4329bd2a8a..cbd94af945 100644 --- a/src/common/json_helper.h +++ b/src/common/json_helper.h @@ -237,6 +237,12 @@ JSON_DECODE_ENTRIES(input, t, __VA_ARGS__); \ } +#define JSON_ENCODE_OBJ(writer, name, ...) \ + do { \ + writer.Key(#name); \ + dsn::json::json_encode(writer, __VA_ARGS__); \ + } while (0) + namespace dsn { namespace json { diff --git a/src/replica/duplication/replica_duplicator.h b/src/replica/duplication/replica_duplicator.h index ebf4473b99..e9df7d7cb6 100644 --- a/src/replica/duplication/replica_duplicator.h +++ b/src/replica/duplication/replica_duplicator.h @@ -23,6 +23,7 @@ #include #include "common//duplication_common.h" +#include "common/json_helper.h" #include "common/replication_other_types.h" #include "duplication_types.h" #include "replica/replica_base.h" @@ -143,6 +144,25 @@ class replica_duplicator : public replica_base, public pipeline::base void set_duplication_plog_checking(bool checking); + // Encode current progress of this duplication into json. + template + void encode_progress(TWriter &writer) const + { + writer.StartObject(); + + JSON_ENCODE_OBJ(writer, dupid, _id); + JSON_ENCODE_OBJ(writer, remote_cluster_name, _remote_cluster_name); + JSON_ENCODE_OBJ(writer, remote_app_name, _remote_app_name); + + { + zauto_read_lock l(_lock); + JSON_ENCODE_OBJ(writer, confirmed_decree, _progress.last_decree); + JSON_ENCODE_OBJ(writer, persisted_decree, _progress.confirmed_decree); + } + + writer.EndObject(); + } + private: friend class duplication_test_base; friend class replica_duplicator_test; diff --git a/src/replica/duplication/replica_duplicator_manager.cpp b/src/replica/duplication/replica_duplicator_manager.cpp index d60bf57b20..9d2153559a 100644 --- a/src/replica/duplication/replica_duplicator_manager.cpp +++ b/src/replica/duplication/replica_duplicator_manager.cpp @@ -22,7 +22,11 @@ #include "common//duplication_common.h" #include "common/gpid.h" +#include "common/replication_enums.h" +#include "metadata_types.h" #include "replica/duplication/replica_duplicator.h" +#include "replica/duplication/replica_duplicator_manager.h" +#include "replica/replica.h" #include "replica_duplicator_manager.h" #include "utils/autoref_ptr.h" #include "utils/errors.h" @@ -41,29 +45,56 @@ replica_duplicator_manager::replica_duplicator_manager(replica *r) { } +void replica_duplicator_manager::update_duplication_map( + const std::map &new_dup_map) +{ + if (new_dup_map.empty() || _replica->status() != partition_status::PS_PRIMARY) { + remove_all_duplications(); + return; + } + + remove_non_existed_duplications(new_dup_map); + + for (const auto &kv2 : new_dup_map) { + sync_duplication(kv2.second); + } +} + std::vector replica_duplicator_manager::get_duplication_confirms_to_update() const { zauto_lock l(_lock); std::vector updates; - for (const auto &kv : _duplications) { - replica_duplicator *duplicator = kv.second.get(); - duplication_progress p = duplicator->progress(); - if (p.last_decree != p.confirmed_decree || - (kv.second->status() == duplication_status::DS_PREPARE && p.checkpoint_has_prepared)) { - if (p.last_decree < p.confirmed_decree) { - LOG_ERROR_PREFIX("invalid decree state: p.last_decree({}) < p.confirmed_decree({})", - p.last_decree, - p.confirmed_decree); - continue; - } - duplication_confirm_entry entry; - entry.dupid = duplicator->id(); - entry.confirmed_decree = p.last_decree; - entry.__set_checkpoint_prepared(p.checkpoint_has_prepared); - updates.emplace_back(entry); + for (const auto & [ _, dup ] : _duplications) { + // There are two conditions when we should send confirmed decrees to meta server to update + // the progress: + // + // 1. the acknowledged decree from remote cluster has changed, making it different from + // the one that is persisted in zk by meta server; otherwise, + // + // 2. the duplication has been in the stage of synchronizing checkpoint to the remote + // cluster, and the synchronized checkpoint has been ready. + const auto &progress = dup->progress(); + if (progress.last_decree == progress.confirmed_decree && + (dup->status() != duplication_status::DS_PREPARE || + !progress.checkpoint_has_prepared)) { + continue; } + + if (progress.last_decree < progress.confirmed_decree) { + LOG_ERROR_PREFIX( + "invalid decree state: progress.last_decree({}) < progress.confirmed_decree({})", + progress.last_decree, + progress.confirmed_decree); + continue; + } + + duplication_confirm_entry entry; + entry.dupid = dup->id(); + entry.confirmed_decree = progress.last_decree; + entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared); + updates.emplace_back(entry); } return updates; } @@ -191,5 +222,17 @@ replica_duplicator_manager::get_dup_states() const return ret; } +void replica_duplicator_manager::remove_all_duplications() +{ + // fast path + if (_duplications.empty()) { + return; + } + + LOG_WARNING_PREFIX("remove all duplication, replica status = {}", + enum_to_string(_replica->status())); + _duplications.clear(); +} + } // namespace replication } // namespace dsn diff --git a/src/replica/duplication/replica_duplicator_manager.h b/src/replica/duplication/replica_duplicator_manager.h index 51bcbd1e1d..413176a16f 100644 --- a/src/replica/duplication/replica_duplicator_manager.h +++ b/src/replica/duplication/replica_duplicator_manager.h @@ -24,19 +24,16 @@ #include #include "common//duplication_common.h" -#include "common/replication_enums.h" #include "common/replication_other_types.h" #include "duplication_types.h" -#include "metadata_types.h" -#include "replica/replica.h" #include "replica/replica_base.h" #include "replica_duplicator.h" -#include "utils/fmt_logging.h" #include "utils/metrics.h" #include "utils/zlocks.h" namespace dsn { namespace replication { +class replica; /// replica_duplicator_manager manages the set of duplications on this replica. /// \see duplication_sync_timer @@ -51,19 +48,7 @@ class replica_duplicator_manager : public replica_base // - replica is not primary on replica-server perspective (status != PRIMARY) // - replica is not primary on meta-server perspective (progress.find(partition_id) == end()) // - the app is not assigned with duplication (dup_map.empty()) - void update_duplication_map(const std::map &new_dup_map) - { - if (new_dup_map.empty() || _replica->status() != partition_status::PS_PRIMARY) { - remove_all_duplications(); - return; - } - - remove_non_existed_duplications(new_dup_map); - - for (const auto &kv2 : new_dup_map) { - sync_duplication(kv2.second); - } - } + void update_duplication_map(const std::map &new_dup_map); /// collect updated duplication confirm points from this replica. std::vector get_duplication_confirms_to_update() const; @@ -93,21 +78,30 @@ class replica_duplicator_manager : public replica_base }; std::vector get_dup_states() const; + // Encode current progress of all duplication into json. + template + void encode_progress(TWriter &writer) const + { + zauto_lock l(_lock); + + if (_duplications.empty()) { + return; + } + + writer.Key("duplications"); + writer.StartArray(); + for (const auto & [ _, dup ] : _duplications) { + dup->encode_progress(writer); + } + writer.EndArray(); + } + private: void sync_duplication(const duplication_entry &ent); void remove_non_existed_duplications(const std::map &); - void remove_all_duplications() - { - // fast path - if (_duplications.empty()) - return; - - LOG_WARNING_PREFIX("remove all duplication, replica status = {}", - enum_to_string(_replica->status())); - _duplications.clear(); - } + void remove_all_duplications(); private: friend class duplication_sync_timer_test; diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 5bb1f17b83..be31df14fd 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -41,10 +41,10 @@ #include "common/replication_common.h" #include "common/replication_enums.h" #include "consensus_types.h" -#include "duplication/replica_duplicator_manager.h" #include "duplication/replica_follower.h" #include "mutation.h" #include "mutation_log.h" +#include "replica/duplication/replica_duplicator_manager.h" #include "replica/prepare_list.h" #include "replica/replica_context.h" #include "replica/replication_app_base.h" @@ -578,6 +578,10 @@ mutation_ptr replica::new_mutation(decree decree) return mu; } +decree replica::last_applied_decree() const { return _app->last_committed_decree(); } + +decree replica::last_flushed_decree() const { return _app->last_flushed_decree(); } + decree replica::last_durable_decree() const { return _app->last_durable_decree(); } decree replica::last_prepared_decree() const diff --git a/src/replica/replica.h b/src/replica/replica.h index 3b90641cdd..ae0118dc05 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -35,8 +35,10 @@ #include #include +#include "common/json_helper.h" #include "common/replication_other_types.h" #include "dsn.layer2_types.h" +#include "duplication/replica_duplicator_manager.h" // IWYU pragma: keep #include "meta_admin_types.h" #include "metadata_types.h" #include "mutation.h" @@ -96,7 +98,6 @@ class replica; class replica_backup_manager; class replica_bulk_loader; class replica_disk_migrator; -class replica_duplicator_manager; class replica_follower; class replica_split_manager; class replica_stub; @@ -223,8 +224,37 @@ class replica : public serverlet, public ref_counter, public replica_ba const app_info *get_app_info() const { return &_app_info; } decree max_prepared_decree() const { return _prepare_list->max_decree(); } decree last_committed_decree() const { return _prepare_list->last_committed_decree(); } + + // The last decree that has been applied into rocksdb memtable. + decree last_applied_decree() const; + + // The last decree that has been flushed into rocksdb sst. + decree last_flushed_decree() const; + decree last_prepared_decree() const; decree last_durable_decree() const; + + // Encode current progress of decrees into json, including both local writes and duplications + // of this replica. + template + void encode_progress(TWriter &writer) const + { + writer.StartObject(); + + JSON_ENCODE_OBJ(writer, max_prepared_decree, max_prepared_decree()); + JSON_ENCODE_OBJ(writer, max_plog_decree, _private_log->max_decree(get_gpid())); + JSON_ENCODE_OBJ(writer, max_plog_commit_on_disk, _private_log->max_commit_on_disk()); + JSON_ENCODE_OBJ(writer, last_committed_decree, last_committed_decree()); + JSON_ENCODE_OBJ(writer, last_applied_decree, last_applied_decree()); + JSON_ENCODE_OBJ(writer, last_flushed_decree, last_flushed_decree()); + JSON_ENCODE_OBJ(writer, last_durable_decree, last_durable_decree()); + JSON_ENCODE_OBJ(writer, max_gc_decree, _private_log->max_gced_decree(get_gpid())); + + _duplication_mgr->encode_progress(writer); + + writer.EndObject(); + } + const std::string &dir() const { return _dir; } uint64_t create_time_milliseconds() const { return _create_time_ms; } const char *name() const { return replica_name(); } @@ -429,13 +459,6 @@ class replica : public serverlet, public ref_counter, public replica_ba error_code background_sync_checkpoint(); void catch_up_with_private_logs(partition_status::type s); void on_checkpoint_completed(error_code err); - void on_copy_checkpoint_ack(error_code err, - const std::shared_ptr &req, - const std::shared_ptr &resp); - void on_copy_checkpoint_file_completed(error_code err, - size_t sz, - std::shared_ptr resp, - const std::string &chk_dir); // Enable/Disable plog garbage collection to be executed. For example, to duplicate data // to target cluster, we could firstly disable plog garbage collection, then do copy_data. diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 3558fb7a45..a12c22efb6 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -29,6 +29,7 @@ // IWYU pragma: no_include #include #include +#include #include #include #include @@ -37,8 +38,8 @@ #include #include #include -#include #include +#include #include #include @@ -47,6 +48,7 @@ #include "bulk_load/replica_bulk_loader.h" #include "common/backup_common.h" #include "common/duplication_common.h" +#include "common/json_helper.h" #include "common/replication.codes.h" #include "common/replication_enums.h" #include "disk_cleaner.h" @@ -2335,6 +2337,22 @@ void replica_stub::register_ctrl_command() }); })); + _cmds.emplace_back(::dsn::command_manager::instance().register_single_command( + "replica.query-progress", + "Query the progress of decrees, including both local writes and duplications for " + "replicas specified by comma-separated list of 'app_id' or 'app_id.partition_id', " + "or all replicas for empty", + "[id1,id2,...]", + [this](const std::vector &args) { + return exec_command_on_replica(args, true, [](const replica_ptr &rep) { + std::ostringstream out; + rapidjson::OStreamWrapper wrapper(out); + dsn::json::PrettyJsonWriter writer(wrapper); + rep->encode_progress(writer); + return out.str(); + }); + })); + #ifdef DSN_ENABLE_GPERF _cmds.emplace_back(::dsn::command_manager::instance().register_bool_command( _release_tcmalloc_memory, diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h index c3559c095d..2a88618f64 100644 --- a/src/replica/replication_app_base.h +++ b/src/replica/replication_app_base.h @@ -238,7 +238,13 @@ class replication_app_base : public replica_base // // Query methods. // + + // Get the decree of the last flushed mutation. -1 means failed to get. + virtual replication::decree last_flushed_decree() const = 0; + + // Get the decree of the last created checkpoint. virtual replication::decree last_durable_decree() const = 0; + // The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK. virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0; diff --git a/src/replica/storage/simple_kv/simple_kv.server.impl.h b/src/replica/storage/simple_kv/simple_kv.server.impl.h index 240ed87899..b296c1f83d 100644 --- a/src/replica/storage/simple_kv/simple_kv.server.impl.h +++ b/src/replica/storage/simple_kv/simple_kv.server.impl.h @@ -70,7 +70,9 @@ class simple_kv_service_impl : public simple_kv_service virtual ::dsn::error_code stop(bool cleanup = false) override; - virtual int64_t last_durable_decree() const override { return _last_durable_decree; } + int64_t last_flushed_decree() const override { return _last_durable_decree; } + + int64_t last_durable_decree() const override { return _last_durable_decree; } virtual ::dsn::error_code sync_checkpoint() override; diff --git a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h index 1235cdbc68..8b80396a02 100644 --- a/src/replica/storage/simple_kv/test/simple_kv.server.impl.h +++ b/src/replica/storage/simple_kv/test/simple_kv.server.impl.h @@ -82,7 +82,9 @@ class simple_kv_service_impl : public application::simple_kv_service virtual ::dsn::error_code stop(bool cleanup = false) override; - virtual int64_t last_durable_decree() const override { return _last_durable_decree; } + int64_t last_flushed_decree() const override { return _last_durable_decree; } + + int64_t last_durable_decree() const override { return _last_durable_decree; } virtual ::dsn::error_code sync_checkpoint() override; diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index 6d7725b787..cc631143b0 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -83,6 +83,8 @@ class mock_replication_app_base : public replication_app_base // we mock the followings void update_app_envs(const std::map &envs) override { _envs = envs; } void query_app_envs(std::map &out) override { out = _envs; } + + decree last_flushed_decree() const override { return _last_durable_decree; } decree last_durable_decree() const override { return _last_durable_decree; } // TODO(heyuchen): implement this function in further pull request diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 35419d8efb..75c95a8128 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2168,47 +2168,49 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char } LOG_INFO_PREFIX("copy checkpoint to dir({}) succeed", checkpoint_dir); - if (checkpoint_decree != nullptr) { - rocksdb::DB *snapshot_db = nullptr; - std::vector handles_opened; - auto cleanup = [&](bool remove_checkpoint) { - if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) { - LOG_ERROR_PREFIX("remove checkpoint directory {} failed", checkpoint_dir); - } - if (snapshot_db) { - for (auto handle : handles_opened) { - if (handle) { - snapshot_db->DestroyColumnFamilyHandle(handle); - handle = nullptr; - } + if (checkpoint_decree == nullptr) { + return ::dsn::ERR_OK; + } + + rocksdb::DB *snapshot_db = nullptr; + std::vector handles_opened; + auto cleanup = [&](bool remove_checkpoint) { + if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) { + LOG_ERROR_PREFIX("remove checkpoint directory {} failed", checkpoint_dir); + } + if (snapshot_db) { + for (auto handle : handles_opened) { + if (handle) { + snapshot_db->DestroyColumnFamilyHandle(handle); + handle = nullptr; } - delete snapshot_db; - snapshot_db = nullptr; } - }; - - // Because of RocksDB's restriction, we have to to open default column family even though - // not use it - std::vector column_families( - {{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}, - {meta_store::META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}}); - status = rocksdb::DB::OpenForReadOnly( - _db_opts, checkpoint_dir, column_families, &handles_opened, &snapshot_db); - if (!status.ok()) { - LOG_ERROR_PREFIX( - "OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString()); + delete snapshot_db; snapshot_db = nullptr; - cleanup(true); - return ::dsn::ERR_LOCAL_APP_FAILURE; } - CHECK_EQ_PREFIX(handles_opened.size(), 2); - CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME); - uint64_t last_flushed_decree = - _meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]); - *checkpoint_decree = last_flushed_decree; + }; - cleanup(false); + // Because of RocksDB's restriction, we have to to open default column family even though + // not use it + std::vector column_families( + {{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}, + {meta_store::META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}}); + status = rocksdb::DB::OpenForReadOnly( + _db_opts, checkpoint_dir, column_families, &handles_opened, &snapshot_db); + if (!status.ok()) { + LOG_ERROR_PREFIX( + "OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString()); + snapshot_db = nullptr; + cleanup(true); + return ::dsn::ERR_LOCAL_APP_FAILURE; } + CHECK_EQ_PREFIX(handles_opened.size(), 2); + CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME); + uint64_t last_flushed_decree = + _meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]); + *checkpoint_decree = last_flushed_decree; + + cleanup(false); return ::dsn::ERR_OK; } @@ -2318,6 +2320,17 @@ pegasus_server_impl::storage_apply_checkpoint(chkpt_apply_mode mode, return ::dsn::ERR_OK; } +int64_t pegasus_server_impl::last_flushed_decree() const +{ + uint64_t decree = 0; + const auto &err = _meta_store->get_last_flushed_decree(&decree); + if (dsn_unlikely(err != dsn::ERR_OK)) { + return -1; + } + + return static_cast(decree); +} + bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_type, const ::dsn::blob &filter_pattern, const ::dsn::blob &value) diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 361d9cbbae..d902e647dc 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -223,6 +223,8 @@ class pegasus_server_impl : public pegasus_read_service ::dsn::error_code storage_apply_checkpoint(chkpt_apply_mode mode, const dsn::replication::learn_state &state) override; + int64_t last_flushed_decree() const override; + int64_t last_durable_decree() const override { return _last_durable_decree.load(); } void update_app_envs(const std::map &envs) override;