From ca96ccfa224dc6d92c70ca764a91ccd4168cfdd9 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 5 Jan 2021 11:48:04 +0800 Subject: [PATCH 1/5] refactor: move write_batch_delete into rocksdb_wrapper --- src/server/pegasus_write_service_impl.h | 8 +++----- src/server/rocksdb_wrapper.cpp | 19 +++++++++++++++++++ src/server/rocksdb_wrapper.h | 1 + 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 2638941733..360a09f861 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -169,21 +169,19 @@ class pegasus_write_service::impl : public dsn::replication::replica_base return empty_put(decree); } + auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); }); for (auto &sort_key : update.sort_keys) { resp.error = - db_write_batch_delete(decree, composite_raw_key(update.hash_key, sort_key)); + _rocksdb_wrapper->write_batch_delete(decree, composite_raw_key(update.hash_key, sort_key)); if (resp.error) { - clear_up_batch_states(decree, resp.error); return resp.error; } } - resp.error = db_write(decree); + resp.error = _rocksdb_wrapper->write(decree); if (resp.error == 0) { resp.count = update.sort_keys.size(); } - - clear_up_batch_states(decree, resp.error); return resp.error; } diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 45dd6a1407..6871f06b6f 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -160,6 +160,25 @@ int rocksdb_wrapper::write(int64_t decree) return status.code(); } +int rocksdb_wrapper::write_batch_delete(int64_t decree, dsn::string_view raw_key) +{ + FAIL_POINT_INJECT_F("db_write_batch_delete", + [](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; }); + + rocksdb::Status s = _write_batch->Delete(utils::to_rocksdb_slice(raw_key)); + if (dsn_unlikely(!s.ok())) { + dsn::blob hash_key, sort_key; + pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); + derror_rocksdb("WriteBatchDelete", + s.ToString(), + "decree: {}, hash_key: {}, sort_key: {}", + decree, + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); + } + return s.code(); +} + void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); } void rocksdb_wrapper::set_default_ttl(uint32_t ttl) diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h index 351f6e59f1..06e38787f6 100644 --- a/src/server/rocksdb_wrapper.h +++ b/src/server/rocksdb_wrapper.h @@ -62,6 +62,7 @@ class rocksdb_wrapper : public dsn::replication::replica_base dsn::string_view value, uint32_t expire_sec); int write(int64_t decree); + int write_batch_delete(int64_t decree, dsn::string_view raw_key); void clear_up_write_batch(); void set_default_ttl(uint32_t ttl); From ad4c225a08cc5eeec29c70b0c78fc85e0a5513e9 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 5 Jan 2021 14:15:45 +0800 Subject: [PATCH 2/5] fix --- src/server/pegasus_mutation_duplicator.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 7244f4ccd3..0191e9c611 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -29,6 +29,7 @@ namespace dsn { namespace replication { + /// static definition of mutation_duplicator::creator. /*static*/ std::function( replica_base *, string_view, string_view)> From d291efb101501220c1df453f0245028e93d5c46c Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 5 Jan 2021 14:15:53 +0800 Subject: [PATCH 3/5] Revert "fix" This reverts commit ad4c225a08cc5eeec29c70b0c78fc85e0a5513e9. --- src/server/pegasus_mutation_duplicator.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 0191e9c611..7244f4ccd3 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -29,7 +29,6 @@ namespace dsn { namespace replication { - /// static definition of mutation_duplicator::creator. /*static*/ std::function( replica_base *, string_view, string_view)> From e94d2165d3c9cf442861eb3efc6103a0880105b0 Mon Sep 17 00:00:00 2001 From: levy Date: Tue, 5 Jan 2021 15:47:47 +0800 Subject: [PATCH 4/5] fix --- src/server/pegasus_write_service_impl.h | 4 ++-- src/server/rocksdb_wrapper.cpp | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h index 360a09f861..8d157090d2 100644 --- a/src/server/pegasus_write_service_impl.h +++ b/src/server/pegasus_write_service_impl.h @@ -171,8 +171,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); }); for (auto &sort_key : update.sort_keys) { - resp.error = - _rocksdb_wrapper->write_batch_delete(decree, composite_raw_key(update.hash_key, sort_key)); + resp.error = _rocksdb_wrapper->write_batch_delete( + decree, composite_raw_key(update.hash_key, sort_key)); if (resp.error) { return resp.error; } diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 6871f06b6f..2842332eea 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -163,18 +163,18 @@ int rocksdb_wrapper::write(int64_t decree) int rocksdb_wrapper::write_batch_delete(int64_t decree, dsn::string_view raw_key) { FAIL_POINT_INJECT_F("db_write_batch_delete", - [](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; }); + [](dsn::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; }); rocksdb::Status s = _write_batch->Delete(utils::to_rocksdb_slice(raw_key)); if (dsn_unlikely(!s.ok())) { dsn::blob hash_key, sort_key; pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); derror_rocksdb("WriteBatchDelete", - s.ToString(), - "decree: {}, hash_key: {}, sort_key: {}", - decree, - utils::c_escape_string(hash_key), - utils::c_escape_string(sort_key)); + s.ToString(), + "decree: {}, hash_key: {}, sort_key: {}", + decree, + utils::c_escape_string(hash_key), + utils::c_escape_string(sort_key)); } return s.code(); } From 0aa0720329104ddec05ec35ae1f1f57ad76cc7fc Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Wed, 6 Jan 2021 11:41:16 +0800 Subject: [PATCH 5/5] Update src/server/rocksdb_wrapper.cpp Co-authored-by: Yingchun Lai <405403881@qq.com> --- src/server/rocksdb_wrapper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index 2842332eea..662bb03cf9 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -169,7 +169,7 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, dsn::string_view raw_key if (dsn_unlikely(!s.ok())) { dsn::blob hash_key, sort_key; pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key); - derror_rocksdb("WriteBatchDelete", + derror_rocksdb("write_batch_delete", s.ToString(), "decree: {}, hash_key: {}, sort_key: {}", decree,