Skip to content

Commit

Permalink
feat(remote_command): provide the query for the progress of decrees i…
Browse files Browse the repository at this point in the history
…ncluding both local writes and duplications (apache#2045)

There are many kinds of decrees while writing locally and duplicating to remote
clusters, for example, the max decree in prepare list, the last decree that has ever
been committed, the last decree that has been applied into rocksdb memtable,
the last decree that has been flushed into rocksdb sst files, the max decree that
has been confirmed by remote cluster for duplication, etc..

These decrees are very useful while we want to watch the progress of all the local
writes and duplications. These decrees might also help us diagnose the problems.
Therefore, we provide a tool in the way of `remote_command` to show the decrees
for each replica.
  • Loading branch information
empiredan authored Jun 21, 2024
1 parent 6a30e9b commit 7e95f31
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 90 deletions.
6 changes: 6 additions & 0 deletions src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@
JSON_DECODE_ENTRIES(input, t, __VA_ARGS__); \
}

#define JSON_ENCODE_OBJ(writer, name, ...) \
do { \
writer.Key(#name); \
dsn::json::json_encode(writer, __VA_ARGS__); \
} while (0)

namespace dsn {
namespace json {

Expand Down
20 changes: 20 additions & 0 deletions src/replica/duplication/replica_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>

#include "common//duplication_common.h"
#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "duplication_types.h"
#include "replica/replica_base.h"
Expand Down Expand Up @@ -143,6 +144,25 @@ class replica_duplicator : public replica_base, public pipeline::base

void set_duplication_plog_checking(bool checking);

// Encode current progress of this duplication into json.
template <typename TWriter>
void encode_progress(TWriter &writer) const
{
writer.StartObject();

JSON_ENCODE_OBJ(writer, dupid, _id);
JSON_ENCODE_OBJ(writer, remote_cluster_name, _remote_cluster_name);
JSON_ENCODE_OBJ(writer, remote_app_name, _remote_app_name);

{
zauto_read_lock l(_lock);
JSON_ENCODE_OBJ(writer, confirmed_decree, _progress.last_decree);
JSON_ENCODE_OBJ(writer, persisted_decree, _progress.confirmed_decree);
}

writer.EndObject();
}

private:
friend class duplication_test_base;
friend class replica_duplicator_test;
Expand Down
75 changes: 59 additions & 16 deletions src/replica/duplication/replica_duplicator_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

#include "common//duplication_common.h"
#include "common/gpid.h"
#include "common/replication_enums.h"
#include "metadata_types.h"
#include "replica/duplication/replica_duplicator.h"
#include "replica/duplication/replica_duplicator_manager.h"
#include "replica/replica.h"
#include "replica_duplicator_manager.h"
#include "utils/autoref_ptr.h"
#include "utils/errors.h"
Expand All @@ -41,29 +45,56 @@ replica_duplicator_manager::replica_duplicator_manager(replica *r)
{
}

void replica_duplicator_manager::update_duplication_map(
const std::map<int32_t, duplication_entry> &new_dup_map)
{
if (new_dup_map.empty() || _replica->status() != partition_status::PS_PRIMARY) {
remove_all_duplications();
return;
}

remove_non_existed_duplications(new_dup_map);

for (const auto &kv2 : new_dup_map) {
sync_duplication(kv2.second);
}
}

std::vector<duplication_confirm_entry>
replica_duplicator_manager::get_duplication_confirms_to_update() const
{
zauto_lock l(_lock);

std::vector<duplication_confirm_entry> updates;
for (const auto &kv : _duplications) {
replica_duplicator *duplicator = kv.second.get();
duplication_progress p = duplicator->progress();
if (p.last_decree != p.confirmed_decree ||
(kv.second->status() == duplication_status::DS_PREPARE && p.checkpoint_has_prepared)) {
if (p.last_decree < p.confirmed_decree) {
LOG_ERROR_PREFIX("invalid decree state: p.last_decree({}) < p.confirmed_decree({})",
p.last_decree,
p.confirmed_decree);
continue;
}
duplication_confirm_entry entry;
entry.dupid = duplicator->id();
entry.confirmed_decree = p.last_decree;
entry.__set_checkpoint_prepared(p.checkpoint_has_prepared);
updates.emplace_back(entry);
for (const auto & [ _, dup ] : _duplications) {
// There are two conditions when we should send confirmed decrees to meta server to update
// the progress:
//
// 1. the acknowledged decree from remote cluster has changed, making it different from
// the one that is persisted in zk by meta server; otherwise,
//
// 2. the duplication has been in the stage of synchronizing checkpoint to the remote
// cluster, and the synchronized checkpoint has been ready.
const auto &progress = dup->progress();
if (progress.last_decree == progress.confirmed_decree &&
(dup->status() != duplication_status::DS_PREPARE ||
!progress.checkpoint_has_prepared)) {
continue;
}

if (progress.last_decree < progress.confirmed_decree) {
LOG_ERROR_PREFIX(
"invalid decree state: progress.last_decree({}) < progress.confirmed_decree({})",
progress.last_decree,
progress.confirmed_decree);
continue;
}

duplication_confirm_entry entry;
entry.dupid = dup->id();
entry.confirmed_decree = progress.last_decree;
entry.__set_checkpoint_prepared(progress.checkpoint_has_prepared);
updates.emplace_back(entry);
}
return updates;
}
Expand Down Expand Up @@ -191,5 +222,17 @@ replica_duplicator_manager::get_dup_states() const
return ret;
}

void replica_duplicator_manager::remove_all_duplications()
{
// fast path
if (_duplications.empty()) {
return;
}

LOG_WARNING_PREFIX("remove all duplication, replica status = {}",
enum_to_string(_replica->status()));
_duplications.clear();
}

} // namespace replication
} // namespace dsn
48 changes: 21 additions & 27 deletions src/replica/duplication/replica_duplicator_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@
#include <vector>

#include "common//duplication_common.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "duplication_types.h"
#include "metadata_types.h"
#include "replica/replica.h"
#include "replica/replica_base.h"
#include "replica_duplicator.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/zlocks.h"

namespace dsn {
namespace replication {
class replica;

/// replica_duplicator_manager manages the set of duplications on this replica.
/// \see duplication_sync_timer
Expand All @@ -51,19 +48,7 @@ class replica_duplicator_manager : public replica_base
// - replica is not primary on replica-server perspective (status != PRIMARY)
// - replica is not primary on meta-server perspective (progress.find(partition_id) == end())
// - the app is not assigned with duplication (dup_map.empty())
void update_duplication_map(const std::map<int32_t, duplication_entry> &new_dup_map)
{
if (new_dup_map.empty() || _replica->status() != partition_status::PS_PRIMARY) {
remove_all_duplications();
return;
}

remove_non_existed_duplications(new_dup_map);

for (const auto &kv2 : new_dup_map) {
sync_duplication(kv2.second);
}
}
void update_duplication_map(const std::map<int32_t, duplication_entry> &new_dup_map);

/// collect updated duplication confirm points from this replica.
std::vector<duplication_confirm_entry> get_duplication_confirms_to_update() const;
Expand Down Expand Up @@ -93,21 +78,30 @@ class replica_duplicator_manager : public replica_base
};
std::vector<dup_state> get_dup_states() const;

// Encode current progress of all duplication into json.
template <typename TWriter>
void encode_progress(TWriter &writer) const
{
zauto_lock l(_lock);

if (_duplications.empty()) {
return;
}

writer.Key("duplications");
writer.StartArray();
for (const auto & [ _, dup ] : _duplications) {
dup->encode_progress(writer);
}
writer.EndArray();
}

private:
void sync_duplication(const duplication_entry &ent);

void remove_non_existed_duplications(const std::map<dupid_t, duplication_entry> &);

void remove_all_duplications()
{
// fast path
if (_duplications.empty())
return;

LOG_WARNING_PREFIX("remove all duplication, replica status = {}",
enum_to_string(_replica->status()));
_duplications.clear();
}
void remove_all_duplications();

private:
friend class duplication_sync_timer_test;
Expand Down
6 changes: 5 additions & 1 deletion src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "consensus_types.h"
#include "duplication/replica_duplicator_manager.h"
#include "duplication/replica_follower.h"
#include "mutation.h"
#include "mutation_log.h"
#include "replica/duplication/replica_duplicator_manager.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
Expand Down Expand Up @@ -578,6 +578,10 @@ mutation_ptr replica::new_mutation(decree decree)
return mu;
}

decree replica::last_applied_decree() const { return _app->last_committed_decree(); }

decree replica::last_flushed_decree() const { return _app->last_flushed_decree(); }

decree replica::last_durable_decree() const { return _app->last_durable_decree(); }

decree replica::last_prepared_decree() const
Expand Down
39 changes: 31 additions & 8 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#include <string>
#include <utility>

#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
#include "duplication/replica_duplicator_manager.h" // IWYU pragma: keep
#include "meta_admin_types.h"
#include "metadata_types.h"
#include "mutation.h"
Expand Down Expand Up @@ -96,7 +98,6 @@ class replica;
class replica_backup_manager;
class replica_bulk_loader;
class replica_disk_migrator;
class replica_duplicator_manager;
class replica_follower;
class replica_split_manager;
class replica_stub;
Expand Down Expand Up @@ -223,8 +224,37 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
const app_info *get_app_info() const { return &_app_info; }
decree max_prepared_decree() const { return _prepare_list->max_decree(); }
decree last_committed_decree() const { return _prepare_list->last_committed_decree(); }

// The last decree that has been applied into rocksdb memtable.
decree last_applied_decree() const;

// The last decree that has been flushed into rocksdb sst.
decree last_flushed_decree() const;

decree last_prepared_decree() const;
decree last_durable_decree() const;

// Encode current progress of decrees into json, including both local writes and duplications
// of this replica.
template <typename TWriter>
void encode_progress(TWriter &writer) const
{
writer.StartObject();

JSON_ENCODE_OBJ(writer, max_prepared_decree, max_prepared_decree());
JSON_ENCODE_OBJ(writer, max_plog_decree, _private_log->max_decree(get_gpid()));
JSON_ENCODE_OBJ(writer, max_plog_commit_on_disk, _private_log->max_commit_on_disk());
JSON_ENCODE_OBJ(writer, last_committed_decree, last_committed_decree());
JSON_ENCODE_OBJ(writer, last_applied_decree, last_applied_decree());
JSON_ENCODE_OBJ(writer, last_flushed_decree, last_flushed_decree());
JSON_ENCODE_OBJ(writer, last_durable_decree, last_durable_decree());
JSON_ENCODE_OBJ(writer, max_gc_decree, _private_log->max_gced_decree(get_gpid()));

_duplication_mgr->encode_progress(writer);

writer.EndObject();
}

const std::string &dir() const { return _dir; }
uint64_t create_time_milliseconds() const { return _create_time_ms; }
const char *name() const { return replica_name(); }
Expand Down Expand Up @@ -429,13 +459,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
error_code background_sync_checkpoint();
void catch_up_with_private_logs(partition_status::type s);
void on_checkpoint_completed(error_code err);
void on_copy_checkpoint_ack(error_code err,
const std::shared_ptr<replica_configuration> &req,
const std::shared_ptr<learn_response> &resp);
void on_copy_checkpoint_file_completed(error_code err,
size_t sz,
std::shared_ptr<learn_response> resp,
const std::string &chk_dir);

// Enable/Disable plog garbage collection to be executed. For example, to duplicate data
// to target cluster, we could firstly disable plog garbage collection, then do copy_data.
Expand Down
20 changes: 19 additions & 1 deletion src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <rapidjson/ostreamwrapper.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
Expand All @@ -37,8 +38,8 @@
#include <deque>
#include <iterator>
#include <mutex>
#include <ostream>
#include <set>
#include <sstream>
#include <type_traits>
#include <vector>

Expand All @@ -47,6 +48,7 @@
#include "bulk_load/replica_bulk_loader.h"
#include "common/backup_common.h"
#include "common/duplication_common.h"
#include "common/json_helper.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
#include "disk_cleaner.h"
Expand Down Expand Up @@ -2335,6 +2337,22 @@ void replica_stub::register_ctrl_command()
});
}));

_cmds.emplace_back(::dsn::command_manager::instance().register_single_command(
"replica.query-progress",
"Query the progress of decrees, including both local writes and duplications for "
"replicas specified by comma-separated list of 'app_id' or 'app_id.partition_id', "
"or all replicas for empty",
"[id1,id2,...]",
[this](const std::vector<std::string> &args) {
return exec_command_on_replica(args, true, [](const replica_ptr &rep) {
std::ostringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::PrettyJsonWriter writer(wrapper);
rep->encode_progress(writer);
return out.str();
});
}));

#ifdef DSN_ENABLE_GPERF
_cmds.emplace_back(::dsn::command_manager::instance().register_bool_command(
_release_tcmalloc_memory,
Expand Down
6 changes: 6 additions & 0 deletions src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,13 @@ class replication_app_base : public replica_base
//
// Query methods.
//

// Get the decree of the last flushed mutation. -1 means failed to get.
virtual replication::decree last_flushed_decree() const = 0;

// Get the decree of the last created checkpoint.
virtual replication::decree last_durable_decree() const = 0;

// The return type is generated by storage engine, e.g. rocksdb::Status::Code, 0 always mean OK.
virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;

Expand Down
Loading

0 comments on commit 7e95f31

Please sign in to comment.