Skip to content

Commit

Permalink
feat(duplication): support duplication entry for multiple purposes (a…
Browse files Browse the repository at this point in the history
…pache#2163)

`duplication_entry` is an important struct for duplication, including per-duplication
properties and progress info(confirmed decree) for each partition of a table. It
contains all useful info for a duplication. Therefore, it has been used in response to
query and sync:

- The query is requested by client, to the meta server to get the properties of
duplications of single table;
- The sync is requested by replica server, also to the meta server to sync progress.

Soon we would support a new interface to get duplications from multiple tables,
i.e. `list duplications`. It would request duplications of multiple tables, also including
per-duplication properties and progress info(confirmed decree and a new field last
committed decree). It still uses `duplication_entry` as the response to the client.
Thus we add fields to `duplication_entry` to support more purposes.
  • Loading branch information
empiredan authored Dec 4, 2024
1 parent d9f2600 commit 7c49e41
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 54 deletions.
29 changes: 28 additions & 1 deletion idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ namespace cpp dsn.replication
namespace go admin
namespace java org.apache.pegasus.replication

// Indicate which data of a table needs to be duplicated:
// * FULL: all of the data of the table needs to be duplicated.
// * INCREMENTAL: only incremental data of the table would be duplicated.
enum duplication_mode
{
FULL = 0,
INCREMENTAL,
}

// - INIT -> PREPARE
// - PREPARE -> APP
// - APP -> LOG
Expand Down Expand Up @@ -129,14 +138,25 @@ struct duplication_modify_response
2:i32 appid;
}

// The states tracking each partition for duplication.
struct duplication_partition_state
{
// The max decree of this partition that has been confirmed to be received by follower.
1:i64 confirmed_decree;

// The max decree that has been committed by this partition.
2:i64 last_committed_decree;
}

struct duplication_entry
{
1:i32 dupid;
2:duplication_status status;
3:string remote;
4:i64 create_ts;

// partition_index => confirmed decree
// Used for syncing duplications with partition-level progress (replica server -> meta server).
// partition index => confirmed decree.
5:optional map<i32, i64> progress;

7:optional duplication_fail_mode fail_mode;
Expand All @@ -150,6 +170,13 @@ struct duplication_entry
// For versions >= v2.6.0, this could be specified by client.
// For versions < v2.6.0, this must be the same with source replica_count.
9:optional i32 remote_replica_count;

// TODO(wangdan): would be supported later.
10:optional duplication_mode mode;

// Used for listing duplications with partition-level details (client -> meta server).
// partition index => partition states.
11:optional map<i32, duplication_partition_state> partition_states;
}

// This request is sent from client to meta.
Expand Down
23 changes: 19 additions & 4 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <nlohmann/json.hpp>
#include <cstdint>
#include <map>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -181,11 +182,12 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
};

if (ent.__isset.progress) {
nlohmann::json sub_json;
for (const auto &p : ent.progress) {
sub_json[std::to_string(p.first)] = p.second;
nlohmann::json progress;
for (const auto &[partition_index, state] : ent.progress) {
progress[std::to_string(partition_index)] = state;
}
json["progress"] = sub_json;

json["progress"] = progress;
}

if (ent.__isset.remote_app_name) {
Expand All @@ -198,6 +200,19 @@ static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
json["remote_replica_count"] = ent.remote_replica_count;
}

if (ent.__isset.partition_states) {
nlohmann::json partition_states;
for (const auto &[partition_index, state] : ent.partition_states) {
nlohmann::json partition_state;
partition_state["confirmed_decree"] = state.confirmed_decree;
partition_state["last_committed_decree"] = state.last_committed_decree;

partition_states[std::to_string(partition_index)] = partition_state;
}

json["partition_states"] = partition_states;
}

return json;
}

Expand Down
13 changes: 3 additions & 10 deletions src/meta/duplication/duplication_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "duplication_info.h"

#include "common/duplication_common.h"
#include "meta/meta_data.h"
#include "runtime/api_layer1.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -221,7 +220,7 @@ void duplication_info::persist_status()

std::string duplication_info::to_string() const
{
return duplication_entry_to_string(to_duplication_entry());
return duplication_entry_to_string(to_partition_level_entry_for_list());
}

blob duplication_info::to_json_blob() const
Expand Down Expand Up @@ -293,17 +292,11 @@ duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id,
return dup;
}

void duplication_info::append_if_valid_for_query(
const app_state &app,
/*out*/ std::vector<duplication_entry> &entry_list) const
void duplication_info::append_as_entry(std::vector<duplication_entry> &entry_list) const
{
zauto_read_lock l(_lock);

entry_list.emplace_back(to_duplication_entry());
duplication_entry &ent = entry_list.back();
// the confirmed decree is not useful for displaying
// the overall state of duplication
ent.__isset.progress = false;
entry_list.emplace_back(to_duplication_level_entry());
}

} // namespace dsn::replication
57 changes: 44 additions & 13 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

Expand All @@ -38,10 +39,7 @@
#include "utils/fmt_utils.h"
#include "utils/zlocks.h"

namespace dsn {
namespace replication {

class app_state;
namespace dsn::replication {

class duplication_info;

Expand Down Expand Up @@ -149,10 +147,10 @@ class duplication_info

// duplication_query_rpc is handled in THREAD_POOL_META_SERVER,
// which is not thread safe for read.
void append_if_valid_for_query(const app_state &app,
/*out*/ std::vector<duplication_entry> &entry_list) const;
void append_as_entry(std::vector<duplication_entry> &entry_list) const;

duplication_entry to_duplication_entry() const
// Build an entry including only duplication-level info.
duplication_entry to_duplication_level_entry() const
{
duplication_entry entry;
entry.dupid = id;
Expand All @@ -162,13 +160,47 @@ class duplication_info
entry.__set_fail_mode(_fail_mode);
entry.__set_remote_app_name(remote_app_name);
entry.__set_remote_replica_count(remote_replica_count);

return entry;
}

// Build an entry including also partition-level progress used for sync besides
// duplication-level info.
duplication_entry to_partition_level_entry_for_sync() const
{
auto entry = to_duplication_level_entry();

entry.__isset.progress = true;
for (const auto &kv : _progress) {
if (!kv.second.is_inited) {
for (const auto &[partition_index, state] : _progress) {
if (!state.is_inited) {
continue;
}

entry.progress.emplace(partition_index, state.stored_decree);
}

return entry;
}

// Build an entry including also partition-level detailed states used for list
// besides duplication-level info.
duplication_entry to_partition_level_entry_for_list() const
{
auto entry = to_duplication_level_entry();

entry.__isset.partition_states = true;
for (const auto &[partition_index, state] : _progress) {
if (!state.is_inited) {
continue;
}
entry.progress[kv.first] = kv.second.stored_decree;

duplication_partition_state partition_state;
partition_state.confirmed_decree = state.stored_decree;
partition_state.last_committed_decree = state.last_committed_decree;

entry.partition_states.emplace(partition_index, partition_state);
}

return entry;
}

Expand Down Expand Up @@ -231,7 +263,7 @@ class duplication_info
bool checkpoint_prepared{false};
};

// partition_idx => progress
// partition_index => progress
std::map<int, partition_progress> _progress;

uint64_t _last_progress_report_ms{0};
Expand Down Expand Up @@ -281,7 +313,6 @@ extern void json_encode(dsn::json::JsonWriter &out, const duplication_fail_mode:

extern bool json_decode(const dsn::json::JsonObject &in, duplication_fail_mode::type &s);

} // namespace replication
} // namespace dsn
} // namespace dsn::replication

USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::duplication_info);
4 changes: 2 additions & 2 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void meta_duplication_service::query_duplication_info(const duplication_query_re

response.appid = app->app_id;
for (const auto &[_, dup] : app->duplications) {
dup->append_if_valid_for_query(*app, response.entry_list);
dup->append_as_entry(response.entry_list);
}
}
}
Expand Down Expand Up @@ -425,7 +425,7 @@ void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
}
}

response.dup_map[app_id][dup_id] = dup->to_duplication_entry();
response.dup_map[app_id][dup_id] = dup->to_partition_level_entry_for_sync();

// report progress periodically for each duplications
dup->report_progress_if_time_up();
Expand Down
Loading

0 comments on commit 7c49e41

Please sign in to comment.