Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dup): implement pegasus_mutation_duplicator #399

Merged
merged 30 commits into from
Jan 6, 2020

Conversation

neverchanje
Copy link
Contributor

@neverchanje neverchanje commented Sep 23, 2019

What problem does this PR solve?

Implement dsn::replication::mutation_duplicator interface to support sending mutations to
the remote Pegasus cluster.

Our target is to simplify the "mutation sending" process, reuse the existing client lib, duplicate each
mutation independently, leave optimization to the future. Therefore the performance may be poor,
but the overall cost is acceptable, as long as we make it easy to understand.

include/dsn/dist/replication/mutation_duplicator.h

class mutation_duplicator : public replica_base
{
public:
    typedef std::function<void(size_t /*total_shipped_size*/)> callback;

    /// Duplicate the provided mutations to the remote cluster.
    /// The implementation must be non-blocking.
    ///
    /// \param cb: Call it when all the given mutations were sent successfully
    virtual void duplicate(mutation_tuple_set mutations, callback cb) = 0;
};

Docs here might be helpful to understand this PR: https://pegasus-kv.github.io/2019/06/09/duplication-design.html#%E6%B5%81%E7%A8%8B

What is changed and how it works?

The design inside the box is:

  1. pegasus_mutation_duplicator receives a batch of mutations to be duplicated to the remote endpoint. After sending the entire batch, it calls cb for the next batch. The batch-by-batch design
    is to preserve the data consistency of both clusters.
void duplicate(mutation_tuple_set muts, callback cb) override;
  1. In our initial implementation, the batch of mutations are shipped one-by-one for consistency:
private:
    std::deque<duplicate_rpc> _inflights;

We can improve this design with slightly increased complexity, by dividing the mutations into
groups, isolated by their key. This way helps increase concurrency with uncompromising consistency.

private:
    // hash -> duplicate_rpc
    std::map<uint64_t, std::deque<duplicate_rpc>> _inflights;
  1. Types of writes are wrapped in duplicate_rpc. This design is extensible for adding and removing a write RPC in the future because the real content is hidden under task_code and raw_message. The duplicator is unaware of what the write is.
struct duplicate_request
{
    // The timestamp of this write.
    1: optional i64 timestamp

    // The code to identify this write.
    2: optional dsn.task_code task_code

    // The binary form of the write.
    3: optional dsn.blob raw_message
}

struct duplicate_response
{
    1: optional i32 error;
}

One corner case is when the write type is RPC_RRDB_RRDB_DUPLICATE, the duplicator should ignore the write duplicated from others. Because duplication is an one-way edge between two nodes.

If the administrator wants to create a two-way duplication between A and B, it should add_dup on A->B as well as B->A.

Check List

Tests

  • Unit test

New perf-counters

  • replica*app.pegasus*dup_shipped_ops@<gpid>
  • replica*app.pegasus*dup_failed_shipping_ops@<gpid>

@neverchanje neverchanje added the component/duplication cluster duplication label Sep 23, 2019
src/base/pegasus_value_schema.h Outdated Show resolved Hide resolved
src/idl/rrdb.thrift Outdated Show resolved Hide resolved
src/server/pegasus_mutation_duplicator.cpp Show resolved Hide resolved
src/server/pegasus_mutation_duplicator.cpp Outdated Show resolved Hide resolved
src/server/pegasus_mutation_duplicator.cpp Outdated Show resolved Hide resolved
@hycdong
Copy link
Contributor

hycdong commented Sep 26, 2019

本次pr涉及的复制流程是由pegasus_mutation_duplicator的duplicate函数开始,处理每个mutation_tuple_set,将每个mutation的dup rpc存储在_inflight map中,并依次调用send函数,pegasus server client的async_duplicate函数,pegasus client的duplicate函数,发送给远端的cluster,发送后的回调由on_duplicate_reply处理,请问我的理解是否有问题?

@neverchanje
Copy link
Contributor Author

本次pr涉及的复制流程是由pegasus_mutation_duplicator的duplicate函数开始,处理每个mutation_tuple_set,将每个mutation的dup rpc存储在_inflight map中,并依次调用send函数,pegasus server client的async_duplicate函数,pegasus client的duplicate函数,发送给远端的cluster,发送后的回调由on_duplicate_reply处理,请问我的理解是否有问题?

src/idl/rrdb.thrift Outdated Show resolved Hide resolved
if (tc == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
dsn::blob raw_key;
dsn::from_blob_to_thrift(data, raw_key);
return pegasus_key_hash(raw_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么不同的操作要使用不同的hash函数?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为 pegasus_key_hash 处理的参数是 hashkey + sortkey, 而 pegasus_hash_key_hash 处理的参数是 hashkey

@neverchanje neverchanje merged commit b8dea57 into apache:master Jan 6, 2020
@neverchanje neverchanje deleted the dup-part branch January 6, 2020 06:16
@neverchanje neverchanje mentioned this pull request Mar 31, 2020
@neverchanje neverchanje added the type/perf-counter PR that made modification on perf-counter, which should be noted in release note. label Mar 31, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1.12.3 component/duplication cluster duplication type/perf-counter PR that made modification on perf-counter, which should be noted in release note.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants