diff --git a/src/base/pegasus_value_schema.h b/src/base/pegasus_value_schema.h index 6943fff994..14bdd0db5a 100644 --- a/src/base/pegasus_value_schema.h +++ b/src/base/pegasus_value_schema.h @@ -19,16 +19,23 @@ namespace pegasus { -#define PEGASUS_DATA_VERSION_MAX 0u +constexpr int PEGASUS_DATA_VERSION_MAX = 1u; /// Generates timetag in host endian. -inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool delete_tag) +/// \see comment on pegasus_value_generator::generate_value_v1 +inline uint64_t generate_timetag(uint64_t timestamp, uint8_t cluster_id, bool deleted_tag) { - return timestamp << 8u | cluster_id << 1u | delete_tag; + return timestamp << 8u | cluster_id << 1u | deleted_tag; +} + +inline uint64_t extract_timestamp_from_timetag(uint64_t timetag) +{ + // 56bit: 0xFFFFFFFFFFFFFFL + return static_cast((timetag >> 8u) & 0xFFFFFFFFFFFFFFLu); } /// Extracts expire_ts from rocksdb value with given version. -/// The value schema must be in v0. +/// The value schema must be in v0 or v1. /// \return expire_ts in host endian inline uint32_t pegasus_extract_expire_ts(uint32_t version, dsn::string_view value) { @@ -55,6 +62,9 @@ pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob auto *s = new std::string(std::move(raw_value)); dsn::data_input input(*s); input.skip(sizeof(uint32_t)); + if (version == 1) { + input.skip(sizeof(uint64_t)); + } dsn::string_view view = input.read_str(); // tricky code to avoid memory copy @@ -62,11 +72,22 @@ pegasus_extract_user_data(uint32_t version, std::string &&raw_value, ::dsn::blob user_data.assign(std::move(buf), 0, static_cast(view.length())); } +/// Extracts timetag from a v1 value. +inline uint64_t pegasus_extract_timetag(int version, dsn::string_view value) +{ + dassert(version == 1, "data version(%d) must be v1", version); + + dsn::data_input input(value); + input.skip(sizeof(uint32_t)); + + return input.read_u64(); +} + /// Update expire_ts in rocksdb value with given version. -/// The value schema must be in v0. +/// The value schema must be in v0 or v1. inline void pegasus_update_expire_ts(uint32_t version, std::string &value, uint32_t new_expire_ts) { - if (version == 0) { + if (version == 0 || version == 1) { dassert_f(value.length() >= sizeof(uint32_t), "value must include 'expire_ts' header"); new_expire_ts = dsn::endian::hton(new_expire_ts); @@ -102,12 +123,16 @@ class pegasus_value_generator { public: /// A higher level utility for generating value with given version. - /// The value schema must be in v0. - rocksdb::SliceParts - generate_value(uint32_t value_schema_version, dsn::string_view user_data, uint32_t expire_ts) + /// The value schema must be in v0 or v1. + rocksdb::SliceParts generate_value(uint32_t value_schema_version, + dsn::string_view user_data, + uint32_t expire_ts, + uint64_t timetag) { if (value_schema_version == 0) { return generate_value_v0(expire_ts, user_data); + } else if (value_schema_version == 1) { + return generate_value_v1(expire_ts, timetag, user_data); } else { dfatal_f("unsupported value schema version: {}", value_schema_version); __builtin_unreachable(); @@ -136,6 +161,59 @@ class pegasus_value_generator return {&_write_slices[0], static_cast(_write_slices.size())}; } + /// The value schema here is designed to resolve write conflicts during duplication, + /// specifically, when two clusters configured as "master-master" are concurrently + /// writing at the same key. + /// + /// Though writings on the same key from two different clusters are rare in + /// real-world cases, it still gives a bad user experience when it happens. + /// A simple solution is to separate the writes into two halves, each cluster + /// is responsible for one half only. How the writes are separated is left to + /// users. This is simple, but unfriendly to use. + /// + /// In our design, each value is provided with a timestamp [0, 2^56-1], which + /// represents the data version. A write duplicated from remote cluster firstly + /// compares its timestamp with the current one if exists. The one with + /// larger timestamp wins. + /// + /// An edge case occurs when the two timestamps are completely equal, the final + /// result is undefined. To solve this we make 7 bits of space for cluster_id + /// (the globally unique id of a cluster). In case when the timestamps are equal, + /// the conflicts can be resolved by comparing the cluster id. + /// + /// Consider another edge case in which a record is deleted from pegasus, however + /// in the remote cluster this record is written with a new value: + /// + /// A: --------- update(ts:700)---- delete ---- update duplicated from B(ts:500) -- + /// B: ---- update(ts:500) -------------------------------------------------------- + /// + /// Since the record is removed, the stale update will successfully though + /// incorrectly apply. To solve this problem there's 1 bit flag marking whether the + /// record is deleted. + /// + /// rocksdb value (ver 1) + /// = [expire_ts(uint32_t)] [timetag(uint64_t)] [user_data(bytes)] + /// = [expire_ts(unit32_t)] + /// [timestamp in μs (56 bit)] [cluster_id (7 bit)] [deleted_tag (1 bit)] + /// [user_data(bytes)] + /// + /// \internal + rocksdb::SliceParts + generate_value_v1(uint32_t expire_ts, uint64_t timetag, dsn::string_view user_data) + { + _write_buf.resize(sizeof(uint32_t) + sizeof(uint64_t)); + _write_slices.clear(); + + dsn::data_output(_write_buf).write_u32(expire_ts).write_u64(timetag); + _write_slices.emplace_back(_write_buf.data(), _write_buf.size()); + + if (user_data.length() > 0) { + _write_slices.emplace_back(user_data.data(), user_data.length()); + } + + return {&_write_slices[0], static_cast(_write_slices.size())}; + } + private: std::string _write_buf; std::vector _write_slices; diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 27d8524440..3b9176da9b 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -53,7 +53,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) : dsn::apps::rrdb_service(r), _db(nullptr), _is_open(false), - _pegasus_data_version(0), + _pegasus_data_version(PEGASUS_DATA_VERSION_MAX), _last_durable_decree(0), _is_checkpointing(false), _manual_compact_svc(this) @@ -91,6 +91,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) // init rocksdb::DBOptions _db_opts.pegasus_data = true; + _db_opts.pegasus_data_version = _pegasus_data_version; _db_opts.create_if_missing = true; _db_opts.error_if_exists = false; _db_opts.use_direct_reads = dsn_config_get_value_bool( diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 678be026ff..6abf1ca1e7 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "key_ttl_compaction_filter.h" #include "pegasus_scan_context.h" @@ -156,6 +157,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service friend class manual_compact_service_test; friend class pegasus_compression_options_test; friend class pegasus_server_impl_test; + FRIEND_TEST(pegasus_server_impl_test, default_data_version); friend class pegasus_manual_compact_service; friend class pegasus_write_service; diff --git a/src/server/pegasus_server_write.h b/src/server/pegasus_server_write.h index 0d4a36ec0a..09d2b90d99 100644 --- a/src/server/pegasus_server_write.h +++ b/src/server/pegasus_server_write.h @@ -58,6 +58,7 @@ class pegasus_server_write : public dsn::replication::replica_base private: friend class pegasus_server_write_test; friend class pegasus_write_service_test; + friend class pegasus_write_service_impl_test; std::unique_ptr _write_svc; std::vector _put_rpc_batch; diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 1a3b47b03d..b360c6b99f 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -38,11 +38,20 @@ struct db_write_context // Whether to compare the timetag of old value with the new write's. // - If true, it requires a read to the DB before write. If the old record has a larger timetag - // than the `remote_timetag`, the write will be ignored, otherwise it will be applied using - // the new timetag generated by local cluster. + // than `remote_timetag`, the write will be ignored, otherwise it will be applied. // - If false, no overhead for the write but the eventual consistency on duplication // is not guaranteed. - bool verfiy_timetag{false}; + // + // This is for duplicated write only. Because under casual consistency, the former + // duplicated write must **happen before** the latest local write, regardless whether + // its timestamp is larger. This relationship can be easily proved: + // ``` + // T1(put "a") > T2(duplicated put "b") > T3(put "b" in remote cluster) + // ``` + // However write conflict may still result in inconsistent data in different clusters, + // though those "versions" can all be considered as latest. User who requires consistency + // can read from one main cluster instead of reading from multiple. + bool verify_timetag{false}; static inline db_write_context empty(int64_t d) { return create(d, 0); } @@ -62,7 +71,7 @@ struct db_write_context db_write_context ctx; ctx.decree = decree; ctx.remote_timetag = remote_timetag; - ctx.verfiy_timetag = verify_timetag; + ctx.verify_timetag = verify_timetag; return ctx; } @@ -148,6 +157,7 @@ class pegasus_write_service private: friend class pegasus_write_service_test; + friend class pegasus_write_service_impl_test; friend class pegasus_server_write_test; pegasus_server_impl *_server; diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 07af982f3d..026da0cccb 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -12,6 +12,7 @@ #include #include +#include namespace pegasus { namespace server { @@ -20,6 +21,32 @@ namespace server { static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101; static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102; static constexpr int FAIL_DB_WRITE = -103; +static constexpr int FAIL_DB_GET = -104; + +struct db_get_context +{ + // value read from DB. + std::string raw_value; + + // is the record found in DB. + bool found{false}; + + // the expiration time encoded in raw_value. + uint32_t expire_ts{0}; + + // is the record expired. + bool expired{false}; +}; + +inline int get_cluster_id_if_exists() +{ + // cluster_id is 0 if not configured, which means it will accept writes + // from any cluster as long as the timestamp is larger. + static auto cluster_id_res = + dsn::replication::get_duplication_cluster_id(dsn::replication::get_current_cluster_name()); + static uint64_t cluster_id = cluster_id_res.is_ok() ? cluster_id_res.get_value() : 0; + return cluster_id; +} class pegasus_write_service::impl : public dsn::replication::replica_base { @@ -513,14 +540,37 @@ class pegasus_write_service::impl : public dsn::replication::replica_base FAIL_POINT_INJECT_F("db_write_batch_put", [](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; }); - if (ctx.verfiy_timetag) { - // TBD(wutao1) + uint64_t new_timetag = ctx.remote_timetag; + if (!ctx.is_duplicated_write()) { // local write + new_timetag = generate_timetag(ctx.timestamp, get_cluster_id_if_exists(), false); + } + + if (ctx.verify_timetag && // needs read-before-write + _pegasus_data_version >= 1 && // data version 0 doesn't support timetag. + !raw_key.empty()) { // not an empty write + + db_get_context get_ctx; + int err = db_get(raw_key, &get_ctx); + if (dsn_unlikely(err != 0)) { + return err; + } + // if record exists and is not expired. + if (get_ctx.found && !get_ctx.expired) { + uint64_t local_timetag = + pegasus_extract_timetag(_pegasus_data_version, get_ctx.raw_value); + + if (local_timetag >= new_timetag) { + // ignore this stale update with lower timetag, + // and write an empty record instead + raw_key = value = dsn::string_view(); + } + } } rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key); rocksdb::SliceParts skey_parts(&skey, 1); - rocksdb::SliceParts svalue = - _value_generator.generate_value(_pegasus_data_version, value, db_expire_ts(expire_sec)); + rocksdb::SliceParts svalue = _value_generator.generate_value( + _pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag); rocksdb::Status s = _batch.Put(skey_parts, svalue); if (dsn_unlikely(!s.ok())) { ::dsn::blob hash_key, sort_key; @@ -570,6 +620,37 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return status.code(); } + // The resulted `expire_ts` is -1 if record is expired. + int db_get(dsn::string_view raw_key, + /*out*/ db_get_context *ctx) + { + FAIL_POINT_INJECT_F("db_get", [](dsn::string_view) -> int { return FAIL_DB_GET; }); + + rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value)); + if (dsn_likely(s.ok())) { + // success + ctx->found = true; + ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value); + if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) { + ctx->expired = true; + } + return 0; + } + if (s.IsNotFound()) { + // NotFound is an acceptable error + ctx->found = false; + return 0; + } + ::dsn::blob hash_key, sort_key; + pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + derror_rocksdb("Get", + s.ToString(), + "hash_key: {}, sort_key: {}", + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); + return s.code(); + } + void clear_up_batch_states(int64_t decree, int err) { if (!_update_responses.empty()) { @@ -715,6 +796,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base private: friend class pegasus_write_service_test; friend class pegasus_server_write_test; + friend class pegasus_write_service_impl_test; + FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag); + FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0); const std::string _primary_address; const uint32_t _pegasus_data_version; @@ -725,7 +809,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base rocksdb::ReadOptions &_rd_opts; volatile uint32_t _default_ttl; ::dsn::perf_counter_wrapper &_pfc_recent_expire_count; - pegasus_value_generator _value_generator; // for setting update_response.error after committed. diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp index 2ad29e9d45..e380da4ac8 100644 --- a/src/server/test/pegasus_server_impl_test.cpp +++ b/src/server/test/pegasus_server_impl_test.cpp @@ -59,5 +59,10 @@ class pegasus_server_impl_test : public pegasus_server_test_base TEST_F(pegasus_server_impl_test, test_table_level_slow_query) { test_table_level_slow_query(); } +TEST_F(pegasus_server_impl_test, default_data_version) +{ + ASSERT_EQ(_server->_pegasus_data_version, 1); +} + } // namespace server } // namespace pegasus diff --git a/src/server/test/pegasus_value_schema_test.cpp b/src/server/test/pegasus_value_schema_test.cpp index 9ad35a71bb..593f8ec07b 100644 --- a/src/server/test/pegasus_value_schema_test.cpp +++ b/src/server/test/pegasus_value_schema_test.cpp @@ -8,25 +8,30 @@ using namespace pegasus; -TEST(value_schema, generate_and_extract_v0) +TEST(value_schema, generate_and_extract_v1_v0) { struct test_case { int value_schema_version; uint32_t expire_ts; + uint64_t timetag; std::string user_data; } tests[] = { - {0, 1000, ""}, - {0, std::numeric_limits::max(), "pegasus"}, - {0, 0, "a"}, - {0, std::numeric_limits::max(), ""}, + {1, 1000, 10001, ""}, + {1, std::numeric_limits::max(), std::numeric_limits::max(), "pegasus"}, + {1, std::numeric_limits::max(), std::numeric_limits::max(), ""}, + + {0, 1000, 0, ""}, + {0, std::numeric_limits::max(), 0, "pegasus"}, + {0, std::numeric_limits::max(), 0, ""}, + {0, 0, 0, "a"}, }; for (auto &t : tests) { pegasus_value_generator gen; rocksdb::SliceParts sparts = - gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts); + gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts, t.timetag); std::string raw_value; for (int i = 0; i < sparts.num_parts; i++) { @@ -35,6 +40,10 @@ TEST(value_schema, generate_and_extract_v0) ASSERT_EQ(t.expire_ts, pegasus_extract_expire_ts(t.value_schema_version, raw_value)); + if (t.value_schema_version == 1) { + ASSERT_EQ(t.timetag, pegasus_extract_timetag(t.value_schema_version, raw_value)); + } + dsn::blob user_data; pegasus_extract_user_data(t.value_schema_version, std::move(raw_value), user_data); ASSERT_EQ(t.user_data, user_data.to_string()); diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp new file mode 100644 index 0000000000..7ee0a1e1da --- /dev/null +++ b/src/server/test/pegasus_write_service_impl_test.cpp @@ -0,0 +1,144 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "pegasus_server_test_base.h" +#include "server/pegasus_server_write.h" +#include "server/pegasus_write_service_impl.h" +#include "message_utils.h" + +#include + +namespace pegasus { +namespace server { + +class pegasus_write_service_impl_test : public pegasus_server_test_base +{ +protected: + std::unique_ptr _server_write; + pegasus_write_service::impl *_write_impl{nullptr}; + +public: + void SetUp() override + { + start(); + _server_write = dsn::make_unique(_server.get(), true); + _write_impl = _server_write->_write_svc->_impl.get(); + } + + uint64_t read_timestamp_from(dsn::string_view raw_key) + { + std::string raw_value; + rocksdb::Status s = _write_impl->_db->Get( + _write_impl->_rd_opts, utils::to_rocksdb_slice(raw_key), &raw_value); + + uint64_t local_timetag = + pegasus_extract_timetag(_write_impl->_pegasus_data_version, raw_value); + return extract_timestamp_from_timetag(local_timetag); + } + + // start with duplicating. + void set_app_duplicating() + { + _server->stop(false); + dsn::replication::destroy_replica(_replica); + + dsn::app_info app_info; + app_info.app_type = "pegasus"; + app_info.duplicating = true; + _replica = + dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false); + _server = dsn::make_unique(_replica); + + SetUp(); + } +}; + +TEST_F(pegasus_write_service_impl_test, put_verify_timetag) +{ + set_app_duplicating(); + + dsn::blob raw_key; + pegasus::pegasus_generate_key( + raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); + std::string value = "value"; + int64_t decree = 10; + + /// insert timestamp 10 + uint64_t timestamp = 10; + auto ctx = db_write_context::create(decree, timestamp); + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); + ASSERT_EQ(read_timestamp_from(raw_key), timestamp); + + /// insert timestamp 15, which overwrites the previous record + timestamp = 15; + ctx = db_write_context::create(decree, timestamp); + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); + ASSERT_EQ(read_timestamp_from(raw_key), timestamp); + + /// insert timestamp 15 from remote, which will overwrite the previous record, + /// since its cluster id is larger (current cluster_id=1) + timestamp = 15; + ctx.remote_timetag = pegasus::generate_timetag(timestamp, 2, false); + ctx.verify_timetag = true; + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value + "_new", 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); + ASSERT_EQ(read_timestamp_from(raw_key), timestamp); + std::string raw_value; + dsn::blob user_value; + rocksdb::Status s = + _write_impl->_db->Get(_write_impl->_rd_opts, utils::to_rocksdb_slice(raw_key), &raw_value); + pegasus_extract_user_data(_write_impl->_pegasus_data_version, std::move(raw_value), user_value); + ASSERT_EQ(user_value.to_string(), "value_new"); + + // write retry + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value + "_new", 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); + + /// insert timestamp 16 from local, which will overwrite the remote record, + /// since its timestamp is larger + timestamp = 16; + ctx = db_write_context::create(decree, timestamp); + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); + ASSERT_EQ(read_timestamp_from(raw_key), timestamp); + + // write retry + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); +} + +// verify timetag on data version v0 +TEST_F(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0) +{ + dsn::fail::setup(); + dsn::fail::cfg("db_get", "100%1*return()"); + // if db_write_batch_put_ctx invokes db_get, this test must fail. + + const_cast(_write_impl->_pegasus_data_version) = 0; // old version + + dsn::blob raw_key; + pegasus::pegasus_generate_key( + raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); + std::string value = "value"; + int64_t decree = 10; + uint64_t timestamp = 10; + + auto ctx = db_write_context::create_duplicate(decree, timestamp, true); + ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); + ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); + _write_impl->clear_up_batch_states(decree, 0); + + dsn::fail::teardown(); +} + +} // namespace server +} // namespace pegasus diff --git a/src/server/test/pegasus_write_service_test.cpp b/src/server/test/pegasus_write_service_test.cpp index dbd8403b02..ae3434f544 100644 --- a/src/server/test/pegasus_write_service_test.cpp +++ b/src/server/test/pegasus_write_service_test.cpp @@ -6,6 +6,7 @@ #include "pegasus_server_test_base.h" #include "server/pegasus_server_write.h" #include "server/pegasus_write_service_impl.h" +#include "message_utils.h" namespace pegasus { namespace server { @@ -13,11 +14,13 @@ namespace server { class pegasus_write_service_test : public pegasus_server_test_base { protected: - pegasus_write_service *_write_svc; + pegasus_write_service *_write_svc{nullptr}; std::unique_ptr _server_write; public: - pegasus_write_service_test() : pegasus_server_test_base() + pegasus_write_service_test() = default; + + void SetUp() override { start(); _server_write = dsn::make_unique(_server.get(), true); @@ -32,11 +35,11 @@ class pegasus_write_service_test : public pegasus_server_test_base dsn::apps::update_response response; int64_t decree = 10; - auto ctx = db_write_context::empty(decree); std::string hash_key = "hash_key"; // alarm for empty request request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size()); + auto ctx = db_write_context::create(decree, 1000); int err = _write_svc->multi_put(ctx, request, response); ASSERT_EQ(err, 0); verify_response(response, rocksdb::Status::kInvalidArgument, decree); @@ -135,6 +138,8 @@ class pegasus_write_service_test : public pegasus_server_test_base int64_t decree = 10; std::string hash_key = "hash_key"; + auto ctx = db_write_context::create(decree, 1000); + constexpr int kv_num = 100; dsn::blob key[kv_num]; std::string value[kv_num]; @@ -154,7 +159,7 @@ class pegasus_write_service_test : public pegasus_server_test_base for (int i = 0; i < kv_num; i++) { dsn::apps::update_request req; req.key = key[i]; - _write_svc->batch_put(db_write_context::empty(decree), req, responses[i]); + _write_svc->batch_put(ctx, req, responses[i]); } for (int i = 0; i < kv_num; i++) { _write_svc->batch_remove(decree, key[i], responses[i]); @@ -186,5 +191,109 @@ TEST_F(pegasus_write_service_test, multi_remove) { test_multi_remove(); } TEST_F(pegasus_write_service_test, batched_writes) { test_batched_writes(); } +TEST_F(pegasus_write_service_test, duplicate_not_batched) +{ + std::string hash_key = "hash_key"; + constexpr int kv_num = 100; + std::string sort_key[kv_num]; + std::string value[kv_num]; + + for (int i = 0; i < 100; i++) { + sort_key[i] = "sort_key_" + std::to_string(i); + value[i] = "value_" + std::to_string(i); + } + + dsn::apps::duplicate_request duplicate; + duplicate.timestamp = 1000; + duplicate.cluster_id = 2; + dsn::apps::duplicate_response resp; + + { + dsn::apps::multi_put_request mput; + for (int i = 0; i < 100; i++) { + mput.kvs.emplace_back(); + mput.kvs.back().key.assign(sort_key[i].data(), 0, sort_key[i].size()); + mput.kvs.back().value.assign(value[i].data(), 0, value[i].size()); + } + dsn::message_ptr mput_msg = pegasus::create_multi_put_request(mput); + + duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_PUT; + duplicate.raw_message = dsn::move_message_to_blob(mput_msg.get()); + + _write_svc->duplicate(1, duplicate, resp); + ASSERT_EQ(resp.error, 0); + } + + { + dsn::apps::multi_remove_request mremove; + for (int i = 0; i < 100; i++) { + mremove.sort_keys.emplace_back(); + mremove.sort_keys.back().assign(sort_key[i].data(), 0, sort_key[i].size()); + } + dsn::message_ptr mremove_msg = pegasus::create_multi_remove_request(mremove); + + duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE; + duplicate.raw_message = dsn::move_message_to_blob(mremove_msg.get()); + + _write_svc->duplicate(1, duplicate, resp); + ASSERT_EQ(resp.error, 0); + } +} + +TEST_F(pegasus_write_service_test, duplicate_batched) +{ + std::string hash_key = "hash_key"; + constexpr int kv_num = 100; + std::string sort_key[kv_num]; + std::string value[kv_num]; + + for (int i = 0; i < 100; i++) { + sort_key[i] = "sort_key_" + std::to_string(i); + value[i] = "value_" + std::to_string(i); + } + + { + dsn::apps::duplicate_request duplicate; + duplicate.timestamp = 1000; + duplicate.cluster_id = 2; + dsn::apps::duplicate_response resp; + + for (int i = 0; i < kv_num; i++) { + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, hash_key, sort_key[i]); + request.value.assign(value[i].data(), 0, value[i].size()); + + dsn::message_ptr msg_ptr = pegasus::create_put_request(request); + duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get()); + duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + _write_svc->duplicate(1, duplicate, resp); + ASSERT_EQ(resp.error, 0); + } + } +} + +TEST_F(pegasus_write_service_test, illegal_duplicate_request) +{ + std::string hash_key = "hash_key"; + std::string sort_key = "sort_key"; + std::string value = "value"; + + // cluster=13 is from nowhere + dsn::apps::duplicate_request duplicate; + duplicate.cluster_id = 13; + duplicate.timestamp = 10; + dsn::apps::duplicate_response resp; + + dsn::apps::update_request request; + pegasus::pegasus_generate_key(request.key, hash_key, sort_key); + request.value.assign(value.data(), 0, value.size()); + + dsn::message_ptr msg_ptr = pegasus::create_put_request(request); // auto release memory + duplicate.raw_message = dsn::move_message_to_blob(msg_ptr.get()); + duplicate.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; + _write_svc->duplicate(1, duplicate, resp); + ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument); +} + } // namespace server } // namespace pegasus