Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dup): write pegasus value in new data version for duplication #459

Merged
merged 27 commits into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e803bbb
feat(dup): write pegasus value in new data version for duplication
Jan 15, 2020
ad87be5
feat(dup): write pegasus value in new data version for duplication
Jan 17, 2020
c8eca4a
reset rdsn
Jan 17, 2020
d7a09ef
update rdsn
Jan 17, 2020
9c783be
Merge branch 'master' into dup-part
Jan 19, 2020
f18d59b
Merge branch 'master' of https://github.com/xiaomi/pegasus into dup-part
Jan 19, 2020
f86224b
Merge branch 'master' into dup-part
Jan 21, 2020
6f7d15f
Merge branch 'master' of https://github.com/xiaomi/pegasus into dup-part
Feb 4, 2020
a671ec5
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Feb 4, 2020
5f82d98
include timestag for local write
Feb 4, 2020
a33fdac
fix test
Feb 4, 2020
9ea052e
Merge branch 'master' into dup-part
Feb 5, 2020
0101b4f
use friend_test for test
Feb 5, 2020
e768571
Merge branch 'master' of https://github.com/xiaomi/pegasus into dup-part
Feb 5, 2020
47d56f6
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Feb 5, 2020
4378bcc
fix db_get
Feb 5, 2020
90ddf59
fix comment
Feb 5, 2020
d52f1f7
Merge branch 'master' into dup-part
Feb 6, 2020
a1b1112
fix according to review
Feb 6, 2020
c854e3d
fix
Feb 6, 2020
4acda3a
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Feb 6, 2020
edeb07d
fix according to review
Feb 7, 2020
be2f17b
Merge branch 'master' of https://github.com/xiaomi/pegasus into dup-part
Feb 7, 2020
99e4d25
Merge branch 'master' into dup-part
Feb 7, 2020
bdd3823
Merge branch 'master' into dup-part
Feb 7, 2020
af96c5e
fix test and review
Feb 10, 2020
7bd72c4
Merge branch 'dup-part' of github.com:neverchanje/pegasus into dup-part
Feb 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 85 additions & 7 deletions src/base/pegasus_value_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@

namespace pegasus {

#define PEGASUS_DATA_VERSION_MAX 0u
#define PEGASUS_DATA_VERSION_MAX 1u
neverchanje marked this conversation as resolved.
Show resolved Hide resolved

/// Generates timetag in host endian.
/// \see comment on pegasus_value_generator::generate_value_v1
inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag)
{
return timestamp << 8u | cluster_id << 1u | delete_tag;
}

inline uint64_t extract_timestamp_from_timetag(uint64_t timetag)
{
// 56bit: 0xFFFFFFFFFFFFFFL
return static_cast<uint64_t>((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu);
}

/// Extracts expire_ts from rocksdb value with given version.
/// The value schema must be in v0.
/// The value schema must be in v0 or v1.
/// \return expire_ts in host endian
inline uint32_t pegasus_extract_expire_ts(uint32_t version, dsn::string_view value)
{
Expand All @@ -55,18 +62,32 @@ pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob
auto *s = new std::string(std::move(raw_value));
dsn::data_input input(*s);
input.skip(sizeof(uint32_t));
if (version == 1) {
input.skip(sizeof(uint64_t));
}
dsn::string_view view = input.read_str();

// tricky code to avoid memory copy
std::shared_ptr<char> buf(const_cast<char *>(view.data()), [s](char *) { delete s; });
user_data.assign(std::move(buf), 0, static_cast<unsigned int>(view.length()));
}

/// Extracts timetag from a v1 value.
inline uint64_t pegasus_extract_timetag(int version, dsn::string_view value)
{
dassert(version == 1, "data version(%d) must be v1", version);

dsn::data_input input(value);
input.skip(sizeof(uint32_t));

return input.read_u64();
}

/// Update expire_ts in rocksdb value with given version.
/// The value schema must be in v0.
/// The value schema must be in v0 or v1.
inline void pegasus_update_expire_ts(uint32_t version, std::string &value, uint32_t new_expire_ts)
{
if (version == 0) {
if (version == 0 || version == 1) {
dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");

new_expire_ts = dsn::endian::hton(new_expire_ts);
Expand Down Expand Up @@ -102,12 +123,16 @@ class pegasus_value_generator
{
public:
/// A higher level utility for generating value with given version.
/// The value schema must be in v0.
rocksdb::SliceParts
generate_value(uint32_t value_schema_version, dsn::string_view user_data, uint32_t expire_ts)
/// The value schema must be in v0 or v1.
rocksdb::SliceParts generate_value(uint32_t value_schema_version,
dsn::string_view user_data,
uint32_t expire_ts,
uint64_t timetag)
{
if (value_schema_version == 0) {
return generate_value_v0(expire_ts, user_data);
} else if (value_schema_version == 1) {
return generate_value_v1(expire_ts, timetag, user_data);
} else {
dfatal_f("unsupported value schema version: {}", value_schema_version);
__builtin_unreachable();
Expand Down Expand Up @@ -136,6 +161,59 @@ class pegasus_value_generator
return {&_write_slices[0], static_cast<int>(_write_slices.size())};
}

/// The value schema here is designed to resolve write conflicts during duplication,
/// specifically, when two clusters configured as "master-master" are concurrently
/// writing at the same key.
///
/// Though writings on the same key from two different clusters are rare in
/// real-world cases, it still gives a bad user experience when it happens.
/// A simple solution is to separate the writes into two halves, each cluster
/// is responsible for one half only. How the writes are separated is left to
/// users. This is simple, but unfriendly to use.
///
/// In our design, each value is provided with a timestamp [0, 2^56-1], which
/// represents the data version. A write duplicated from remote cluster firstly
/// compares its timestamp with the current one if exists. The one with
/// larger timestamp wins.
///
/// An edge case occurs when the two timestamps are completely equal, the final
/// result is undefined. To solve this we make 7 bits of space for cluster_id
/// (the globally unique id of a cluster). In case when the timestamps are equal,
/// the conflicts can be resolved by comparing the cluster id.
///
/// Consider another edge case in which a record is deleted from pegasus, however
/// in the remote cluster this record is written with a new value:
///
/// A: --------- update(ts:700)---- delete ---- update duplicated from B(ts:500) --
/// B: ---- update(ts:500) --------------------------------------------------------
///
/// Since the record is removed, the stale update will successfully though
/// incorrectly apply. To solve this problem there's 1 bit flag marking whether the
/// record is deleted.
///
/// rocksdb value (ver 1)
/// = [expire_ts(uint32_t)] [timetag(uint64_t)] [user_data(bytes)]
/// = [expire_ts(unit32_t)]
/// [timestamp in μs (56 bit)] [cluster_id (7 bit)] [delete_tag (1 bit)]
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
/// [user_data(bytes)]
///
/// \internal
rocksdb::SliceParts
generate_value_v1(uint32_t expire_ts, uint64_t timetag, dsn::string_view user_data)
{
_write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
_write_slices.clear();

dsn::data_output(_write_buf).write_u32(expire_ts).write_u64(timetag);
_write_slices.emplace_back(_write_buf.data(), _write_buf.size());

if (user_data.length() > 0) {
_write_slices.emplace_back(user_data.data(), user_data.length());
}

return {&_write_slices[0], static_cast<int>(_write_slices.size())};
}

private:
std::string _write_buf;
std::vector<rocksdb::Slice> _write_slices;
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class pegasus_server_write : public dsn::replication::replica_base
private:
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;

std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
Expand Down
18 changes: 14 additions & 4 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ struct db_write_context

// Whether to compare the timetag of old value with the new write's.
// - If true, it requires a read to the DB before write. If the old record has a larger timetag
// than the `remote_timetag`, the write will be ignored, otherwise it will be applied using
// the new timetag generated by local cluster.
// than `remote_timetag`, the write will be ignored, otherwise it will be applied.
// - If false, no overhead for the write but the eventual consistency on duplication
// is not guaranteed.
bool verfiy_timetag{false};
//
// This is for duplicated write only. Because under casual consistency, the former
// duplicated write must **happen before** the latest local write, regardless whether
// its timestamp is larger. This relationship can be easily proved:
// ```
// T1(put "a") > T2(duplicated put "b") > T3(put "b" in remote cluster)
// ```
// However write conflict may still result in inconsistent data in different clusters,
// though those "versions" can all be considered as latest. User who requires consistency
// can read from one main cluster instead of reading from multiple.
bool verify_timetag{false};

static inline db_write_context empty(int64_t d) { return create(d, 0); }

Expand All @@ -62,7 +71,7 @@ struct db_write_context
db_write_context ctx;
ctx.decree = decree;
ctx.remote_timetag = remote_timetag;
ctx.verfiy_timetag = verify_timetag;
ctx.verify_timetag = verify_timetag;
return ctx;
}

Expand Down Expand Up @@ -148,6 +157,7 @@ class pegasus_write_service

private:
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class pegasus_server_write_test;

pegasus_server_impl *_server;
Expand Down
92 changes: 86 additions & 6 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <dsn/utility/fail_point.h>
#include <dsn/utility/string_conv.h>
#include <gtest/gtest_prod.h>

namespace pegasus {
namespace server {
Expand All @@ -20,6 +21,22 @@ namespace server {
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
static constexpr int FAIL_DB_GET = -104;

struct db_get_context
{
// value read from DB.
std::string raw_value;

// is the record found in DB.
bool found{false};

// the expiration time encoded in raw_value.
uint32_t expire_ts{0};

// is the record expired.
bool expired{false};
};

class pegasus_write_service::impl : public dsn::replication::replica_base
{
Expand All @@ -31,7 +48,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
_db(server->_db),
_rd_opts(server->_data_cf_rd_opts),
_default_ttl(0),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
_pfc_recent_expire_count(server->_pfc_recent_expire_count),
_server(server)
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
Expand Down Expand Up @@ -513,14 +531,43 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
FAIL_POINT_INJECT_F("db_write_batch_put",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; });

if (ctx.verfiy_timetag) {
// TBD(wutao1)
// cluster_id is 0 if not configured, which means it will accept writes
// from any cluster as long as the timestamp is larger.
static auto cluster_id_res = dsn::replication::get_duplication_cluster_id(
dsn::replication::get_current_cluster_name());
static uint64_t cluster_id = cluster_id_res.is_ok() ? cluster_id_res.get_value() : 0;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved

uint64_t new_timetag = ctx.remote_timetag;
if (!ctx.is_duplicated_write()) { // local write
new_timetag = generate_timetag(ctx.timestamp, cluster_id, false);
}

if (ctx.verify_timetag && // needs read-before-write
_pegasus_data_version >= 1 && // data version 0 doesn't support timetag.
!raw_key.empty()) { // not an empty write

db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
if (dsn_unlikely(err != 0)) {
return err;
}
// if record exists and is not expired.
if (get_ctx.found && !get_ctx.expired) {
uint64_t local_timetag =
pegasus_extract_timetag(_pegasus_data_version, get_ctx.raw_value);

if (local_timetag >= new_timetag) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果timestamp和cluster id相同,remove操作优先于put?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

热备份暂时不考虑 remove,remove 的一致性判断比较复杂

// ignore this stale update with lower timetag,
// and write an empty record instead
raw_key = value = dsn::string_view();
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue =
_value_generator.generate_value(_pegasus_data_version, value, db_expire_ts(expire_sec));
rocksdb::SliceParts svalue = _value_generator.generate_value(
_pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
rocksdb::Status s = _batch.Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
Expand Down Expand Up @@ -570,6 +617,36 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return status.code();
}

// The resulted `expire_ts` is -1 if record is expired.
int db_get(dsn::string_view raw_key,
/*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value));
if (dsn_likely(s.ok())) {
// success
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
}
return 0;
}
if (s.IsNotFound()) {
// NotFound is an acceptable error
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
return 0;
}
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("Get",
s.ToString(),
"hash_key: {}, sort_key: {}",
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
return s.code();
}

void clear_up_batch_states(int64_t decree, int err)
{
if (!_update_responses.empty()) {
Expand Down Expand Up @@ -715,6 +792,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);

const std::string _primary_address;
const uint32_t _pegasus_data_version;
Expand All @@ -725,7 +805,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
rocksdb::ReadOptions &_rd_opts;
volatile uint32_t _default_ttl;
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;

pegasus_server_impl *_server;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
pegasus_value_generator _value_generator;

// for setting update_response.error after committed.
Expand Down
21 changes: 15 additions & 6 deletions src/server/test/pegasus_value_schema_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,30 @@

using namespace pegasus;

TEST(value_schema, generate_and_extract_v0)
TEST(value_schema, generate_and_extract_v1_v0)
{
struct test_case
{
int value_schema_version;

uint32_t expire_ts;
uint64_t timetag;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个用TEST_P刚好适合

Copy link
Contributor Author

@neverchanje neverchanje Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

后面的PR会改,这个PR优先考虑让diff最小。

std::string user_data;
} tests[] = {
{0, 1000, ""},
{0, std::numeric_limits<uint32_t>::max(), "pegasus"},
{0, 0, "a"},
{0, std::numeric_limits<uint32_t>::max(), ""},
{1, 1000, 10001, ""},
{1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), "pegasus"},
{1, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint64_t>::max(), ""},

{0, 1000, 0, ""},
{0, std::numeric_limits<uint32_t>::max(), 0, "pegasus"},
{0, std::numeric_limits<uint32_t>::max(), 0, ""},
{0, 0, 0, "a"},
};

for (auto &t : tests) {
pegasus_value_generator gen;
rocksdb::SliceParts sparts =
gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts);
gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts, t.timetag);

std::string raw_value;
for (int i = 0; i < sparts.num_parts; i++) {
Expand All @@ -35,6 +40,10 @@ TEST(value_schema, generate_and_extract_v0)

ASSERT_EQ(t.expire_ts, pegasus_extract_expire_ts(t.value_schema_version, raw_value));

if (t.value_schema_version == 1) {
ASSERT_EQ(t.timetag, pegasus_extract_timetag(t.value_schema_version, raw_value));
}

dsn::blob user_data;
pegasus_extract_user_data(t.value_schema_version, std::move(raw_value), user_data);
ASSERT_EQ(t.user_data, user_data.to_string());
Expand Down
Loading