Skip to content

Commit

Permalink
feat(dup): write pegasus value in new data version for duplication (#459
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Wu Tao authored Feb 11, 2020
1 parent 2ee37f3 commit 9d311de
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 29 deletions.
96 changes: 87 additions & 9 deletions src/base/pegasus_value_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@

namespace pegasus {

#define PEGASUS_DATA_VERSION_MAX 0u
constexpr int PEGASUS_DATA_VERSION_MAX = 1u;

/// Generates timetag in host endian.
inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag)
/// \see comment on pegasus_value_generator::generate_value_v1
inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag)
{
return timestamp << 8u | cluster_id << 1u | delete_tag;
return timestamp << 8u | cluster_id << 1u | deleted_tag;
}

inline uint64_t extract_timestamp_from_timetag(uint64_t timetag)
{
// 56bit: 0xFFFFFFFFFFFFFFL
return static_cast<uint64_t>((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu);
}

/// Extracts expire_ts from rocksdb value with given version.
/// The value schema must be in v0.
/// The value schema must be in v0 or v1.
/// \return expire_ts in host endian
inline uint32_t pegasus_extract_expire_ts(uint32_t version, dsn::string_view value)
{
Expand All @@ -55,18 +62,32 @@ pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob
auto *s = new std::string(std::move(raw_value));
dsn::data_input input(*s);
input.skip(sizeof(uint32_t));
if (version == 1) {
input.skip(sizeof(uint64_t));
}
dsn::string_view view = input.read_str();

// tricky code to avoid memory copy
std::shared_ptr<char> buf(const_cast<char *>(view.data()), [s](char *) { delete s; });
user_data.assign(std::move(buf), 0, static_cast<unsigned int>(view.length()));
}

/// Extracts timetag from a v1 value.
inline uint64_t pegasus_extract_timetag(int version, dsn::string_view value)
{
dassert(version == 1, "data version(%d) must be v1", version);

dsn::data_input input(value);
input.skip(sizeof(uint32_t));

return input.read_u64();
}

/// Update expire_ts in rocksdb value with given version.
/// The value schema must be in v0.
/// The value schema must be in v0 or v1.
inline void pegasus_update_expire_ts(uint32_t version, std::string &value, uint32_t new_expire_ts)
{
if (version == 0) {
if (version == 0 || version == 1) {
dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header");

new_expire_ts = dsn::endian::hton(new_expire_ts);
Expand Down Expand Up @@ -102,12 +123,16 @@ class pegasus_value_generator
{
public:
/// A higher level utility for generating value with given version.
/// The value schema must be in v0.
rocksdb::SliceParts
generate_value(uint32_t value_schema_version, dsn::string_view user_data, uint32_t expire_ts)
/// The value schema must be in v0 or v1.
rocksdb::SliceParts generate_value(uint32_t value_schema_version,
dsn::string_view user_data,
uint32_t expire_ts,
uint64_t timetag)
{
if (value_schema_version == 0) {
return generate_value_v0(expire_ts, user_data);
} else if (value_schema_version == 1) {
return generate_value_v1(expire_ts, timetag, user_data);
} else {
dfatal_f("unsupported value schema version: {}", value_schema_version);
__builtin_unreachable();
Expand Down Expand Up @@ -136,6 +161,59 @@ class pegasus_value_generator
return {&_write_slices[0], static_cast<int>(_write_slices.size())};
}

/// The value schema here is designed to resolve write conflicts during duplication,
/// specifically, when two clusters configured as "master-master" are concurrently
/// writing at the same key.
///
/// Though writings on the same key from two different clusters are rare in
/// real-world cases, it still gives a bad user experience when it happens.
/// A simple solution is to separate the writes into two halves, each cluster
/// is responsible for one half only. How the writes are separated is left to
/// users. This is simple, but unfriendly to use.
///
/// In our design, each value is provided with a timestamp [0, 2^56-1], which
/// represents the data version. A write duplicated from remote cluster firstly
/// compares its timestamp with the current one if exists. The one with
/// larger timestamp wins.
///
/// An edge case occurs when the two timestamps are completely equal, the final
/// result is undefined. To solve this we make 7 bits of space for cluster_id
/// (the globally unique id of a cluster). In case when the timestamps are equal,
/// the conflicts can be resolved by comparing the cluster id.
///
/// Consider another edge case in which a record is deleted from pegasus, however
/// in the remote cluster this record is written with a new value:
///
/// A: --------- update(ts:700)---- delete ---- update duplicated from B(ts:500) --
/// B: ---- update(ts:500) --------------------------------------------------------
///
/// Since the record is removed, the stale update will successfully though
/// incorrectly apply. To solve this problem there's 1 bit flag marking whether the
/// record is deleted.
///
/// rocksdb value (ver 1)
/// = [expire_ts(uint32_t)] [timetag(uint64_t)] [user_data(bytes)]
/// = [expire_ts(unit32_t)]
/// [timestamp in μs (56 bit)] [cluster_id (7 bit)] [deleted_tag (1 bit)]
/// [user_data(bytes)]
///
/// \internal
rocksdb::SliceParts
generate_value_v1(uint32_t expire_ts, uint64_t timetag, dsn::string_view user_data)
{
_write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t));
_write_slices.clear();

dsn::data_output(_write_buf).write_u32(expire_ts).write_u64(timetag);
_write_slices.emplace_back(_write_buf.data(), _write_buf.size());

if (user_data.length() > 0) {
_write_slices.emplace_back(user_data.data(), user_data.length());
}

return {&_write_slices[0], static_cast<int>(_write_slices.size())};
}

private:
std::string _write_buf;
std::vector<rocksdb::Slice> _write_slices;
Expand Down
3 changes: 2 additions & 1 deletion src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
: dsn::apps::rrdb_service(r),
_db(nullptr),
_is_open(false),
_pegasus_data_version(0),
_pegasus_data_version(PEGASUS_DATA_VERSION_MAX),
_last_durable_decree(0),
_is_checkpointing(false),
_manual_compact_svc(this)
Expand Down Expand Up @@ -91,6 +91,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)

// init rocksdb::DBOptions
_db_opts.pegasus_data = true;
_db_opts.pegasus_data_version = _pegasus_data_version;
_db_opts.create_if_missing = true;
_db_opts.error_if_exists = false;
_db_opts.use_direct_reads = dsn_config_get_value_bool(
Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <dsn/dist/replication/replication.codes.h>
#include <rrdb/rrdb_types.h>
#include <rrdb/rrdb.server.h>
#include <gtest/gtest_prod.h>

#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
Expand Down Expand Up @@ -156,6 +157,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
friend class manual_compact_service_test;
friend class pegasus_compression_options_test;
friend class pegasus_server_impl_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);

friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class pegasus_server_write : public dsn::replication::replica_base
private:
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;

std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
Expand Down
18 changes: 14 additions & 4 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ struct db_write_context

// Whether to compare the timetag of old value with the new write's.
// - If true, it requires a read to the DB before write. If the old record has a larger timetag
// than the `remote_timetag`, the write will be ignored, otherwise it will be applied using
// the new timetag generated by local cluster.
// than `remote_timetag`, the write will be ignored, otherwise it will be applied.
// - If false, no overhead for the write but the eventual consistency on duplication
// is not guaranteed.
bool verfiy_timetag{false};
//
// This is for duplicated write only. Because under casual consistency, the former
// duplicated write must **happen before** the latest local write, regardless whether
// its timestamp is larger. This relationship can be easily proved:
// ```
// T1(put "a") > T2(duplicated put "b") > T3(put "b" in remote cluster)
// ```
// However write conflict may still result in inconsistent data in different clusters,
// though those "versions" can all be considered as latest. User who requires consistency
// can read from one main cluster instead of reading from multiple.
bool verify_timetag{false};

static inline db_write_context empty(int64_t d) { return create(d, 0); }

Expand All @@ -62,7 +71,7 @@ struct db_write_context
db_write_context ctx;
ctx.decree = decree;
ctx.remote_timetag = remote_timetag;
ctx.verfiy_timetag = verify_timetag;
ctx.verify_timetag = verify_timetag;
return ctx;
}

Expand Down Expand Up @@ -148,6 +157,7 @@ class pegasus_write_service

private:
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class pegasus_server_write_test;

pegasus_server_impl *_server;
Expand Down
93 changes: 88 additions & 5 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <dsn/utility/fail_point.h>
#include <dsn/utility/string_conv.h>
#include <gtest/gtest_prod.h>

namespace pegasus {
namespace server {
Expand All @@ -20,6 +21,32 @@ namespace server {
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
static constexpr int FAIL_DB_GET = -104;

struct db_get_context
{
// value read from DB.
std::string raw_value;

// is the record found in DB.
bool found{false};

// the expiration time encoded in raw_value.
uint32_t expire_ts{0};

// is the record expired.
bool expired{false};
};

inline int get_cluster_id_if_exists()
{
// cluster_id is 0 if not configured, which means it will accept writes
// from any cluster as long as the timestamp is larger.
static auto cluster_id_res =
dsn::replication::get_duplication_cluster_id(dsn::replication::get_current_cluster_name());
static uint64_t cluster_id = cluster_id_res.is_ok() ? cluster_id_res.get_value() : 0;
return cluster_id;
}

class pegasus_write_service::impl : public dsn::replication::replica_base
{
Expand Down Expand Up @@ -513,14 +540,37 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
FAIL_POINT_INJECT_F("db_write_batch_put",
[](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; });

if (ctx.verfiy_timetag) {
// TBD(wutao1)
uint64_t new_timetag = ctx.remote_timetag;
if (!ctx.is_duplicated_write()) { // local write
new_timetag = generate_timetag(ctx.timestamp, get_cluster_id_if_exists(), false);
}

if (ctx.verify_timetag && // needs read-before-write
_pegasus_data_version >= 1 && // data version 0 doesn't support timetag.
!raw_key.empty()) { // not an empty write

db_get_context get_ctx;
int err = db_get(raw_key, &get_ctx);
if (dsn_unlikely(err != 0)) {
return err;
}
// if record exists and is not expired.
if (get_ctx.found && !get_ctx.expired) {
uint64_t local_timetag =
pegasus_extract_timetag(_pegasus_data_version, get_ctx.raw_value);

if (local_timetag >= new_timetag) {
// ignore this stale update with lower timetag,
// and write an empty record instead
raw_key = value = dsn::string_view();
}
}
}

rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue =
_value_generator.generate_value(_pegasus_data_version, value, db_expire_ts(expire_sec));
rocksdb::SliceParts svalue = _value_generator.generate_value(
_pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
rocksdb::Status s = _batch.Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
Expand Down Expand Up @@ -570,6 +620,37 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return status.code();
}

// The resulted `expire_ts` is -1 if record is expired.
int db_get(dsn::string_view raw_key,
/*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value));
if (dsn_likely(s.ok())) {
// success
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
}
return 0;
}
if (s.IsNotFound()) {
// NotFound is an acceptable error
ctx->found = false;
return 0;
}
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
derror_rocksdb("Get",
s.ToString(),
"hash_key: {}, sort_key: {}",
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
return s.code();
}

void clear_up_batch_states(int64_t decree, int err)
{
if (!_update_responses.empty()) {
Expand Down Expand Up @@ -715,6 +796,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);

const std::string _primary_address;
const uint32_t _pegasus_data_version;
Expand All @@ -725,7 +809,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
rocksdb::ReadOptions &_rd_opts;
volatile uint32_t _default_ttl;
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;

pegasus_value_generator _value_generator;

// for setting update_response.error after committed.
Expand Down
5 changes: 5 additions & 0 deletions src/server/test/pegasus_server_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,10 @@ class pegasus_server_impl_test : public pegasus_server_test_base

TEST_F(pegasus_server_impl_test, test_table_level_slow_query) { test_table_level_slow_query(); }

TEST_F(pegasus_server_impl_test, default_data_version)
{
ASSERT_EQ(_server->_pegasus_data_version, 1);
}

} // namespace server
} // namespace pegasus
Loading

0 comments on commit 9d311de

Please sign in to comment.