Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dup): implement pegasus_mutation_duplicator #399

Merged
merged 30 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a1a3e75
shell: add debugging commands for hex and escaped-bytes conversion
Sep 20, 2019
70a6c97
Merge branch 'master' into master
Sep 20, 2019
64cdf42
fix format
Sep 20, 2019
d79431d
Merge branch 'master' of github.com:neverchanje/pegasus
Sep 20, 2019
6feda05
feat(dup): implement pegasus_mutation_duplicator
Sep 23, 2019
b9233ef
add idl
Sep 24, 2019
7eecda5
add value schema test
Sep 24, 2019
afccfbf
Merge branch 'master' into dup-part
Sep 24, 2019
60b4d03
fix travis
Sep 24, 2019
c305781
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Sep 24, 2019
9c1b474
fix travis
Sep 24, 2019
fb69005
Merge branch 'master' into dup-part
Sep 24, 2019
1893f20
Merge branch 'master' of https://github.com/xiaomi/pegasus into dup-part
Sep 26, 2019
b18ef59
fix travis
Sep 26, 2019
bb16b23
Merge branch 'master' of github.com:neverchanje/pegasus into dup-part
Sep 26, 2019
e4d6c76
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Sep 26, 2019
bfbb911
fix codereview
Sep 30, 2019
602e4d5
delete CMakeLists.txt
Sep 30, 2019
ad3f821
Merge branch 'master' into dup-part
Oct 15, 2019
4fc19b7
Merge branch 'master' into dup-part
Dec 4, 2019
b159c0e
Merge branch 'master' of https://github.com/xiaomi/pegasus into dup-part
Dec 26, 2019
d12f16a
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Dec 26, 2019
a576cf3
add comments
Dec 30, 2019
c26e198
refine
Dec 30, 2019
2d3d594
fix thrift
Jan 2, 2020
3fe56a4
ignore duplicating duplicate
Jan 2, 2020
1859186
add cluster_id in duplicate_request
Jan 3, 2020
6e73682
revert value_schema
Jan 3, 2020
b004a1a
fix format
Jan 3, 2020
5a09513
Merge branch 'master' into dup-part
hycdong Jan 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/base/pegasus_key_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,10 @@ inline uint64_t pegasus_key_hash(const ::dsn::blob &key)
}
}

/// Calculate hash value from hash key.
inline uint64_t pegasus_hash_key_hash(const ::dsn::blob &hash_key)
{
return dsn::utils::crc64_calc(hash_key.data(), hash_key.length(), 0);
}

} // namespace pegasus
2 changes: 2 additions & 0 deletions src/base/pegasus_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ using incr_rpc = dsn::rpc_holder<dsn::apps::incr_request, dsn::apps::incr_respon
using check_and_set_rpc =
dsn::rpc_holder<dsn::apps::check_and_set_request, dsn::apps::check_and_set_response>;

using duplicate_rpc = dsn::apps::duplicate_rpc;

using check_and_mutate_rpc =
dsn::rpc_holder<dsn::apps::check_and_mutate_request, dsn::apps::check_and_mutate_response>;

Expand Down
283 changes: 283 additions & 0 deletions src/base/rrdb_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions src/client_lib/pegasus_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,13 @@ int pegasus_client_impl::get_unordered_scanners(int max_split_count,
return ret;
}

void pegasus_client_impl::async_duplicate(dsn::apps::duplicate_rpc rpc,
std::function<void(dsn::error_code)> &&callback,
dsn::task_tracker *tracker)
{
_client->duplicate(rpc, std::move(callback), tracker);
}

const char *pegasus_client_impl::get_error_string(int error_code) const
{
auto it = _client_error_to_string.find(error_code);
Expand Down
13 changes: 10 additions & 3 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ class pegasus_client_impl : public pegasus_client
const scan_options &options,
async_get_unordered_scanners_callback_t &&callback) override;

/// \internal
/// This is an internal function for duplication.
/// \see pegasus::server::pegasus_mutation_duplicator
void async_duplicate(dsn::apps::duplicate_rpc rpc,
std::function<void(dsn::error_code)> &&callback,
dsn::task_tracker *tracker);

virtual const char *get_error_string(int error_code) const override;

static void init_error();
Expand Down Expand Up @@ -279,6 +286,9 @@ class pegasus_client_impl : public pegasus_client
static const ::dsn::blob _max;
};

static int get_client_error(int server_error);
static int get_rocksdb_server_error(int rocskdb_error);

private:
class pegasus_scanner_impl_wrapper : public abstract_pegasus_scanner
{
Expand All @@ -298,9 +308,6 @@ class pegasus_client_impl : public pegasus_client
}
};

static int get_client_error(int server_error);
static int get_rocksdb_server_error(int rocskdb_error);

private:
std::string _cluster_name;
std::string _app_name;
Expand Down
4 changes: 4 additions & 0 deletions src/idl/dsn.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ namespace cpp dsn
struct blob
{
}

struct task_code
{
}
20 changes: 20 additions & 0 deletions src/idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,26 @@ struct scan_response
6:string server;
}

struct duplicate_request
{
// The timestamp of this write.
1: optional i64 timestamp

// The code to identify this write.
2: optional dsn.task_code task_code

// The binary form of the write.
3: optional dsn.blob raw_message

// ID of the cluster where this write comes from.
4: optional byte cluster_id
}

struct duplicate_response
{
1: optional i32 error;
}

service rrdb
{
update_response put(1:update_request update);
Expand Down
Loading