From 5b963f3451d14e6548d0821ab3bc8d84c217f09a Mon Sep 17 00:00:00 2001 From: liuyu <52276794+liuyu85cn@users.noreply.github.com> Date: Wed, 29 Dec 2021 22:16:22 +0800 Subject: [PATCH] split stop to stop and join address comments regular update split stop to stop and join remove dangle edge split mem lock by term --- src/clients/meta/MetaClient.cpp | 1 - src/clients/storage/InternalStorageClient.cpp | 6 +- src/common/utils/MemoryLockWrapper.h | 14 +- src/mock/MockCluster.cpp | 1 + src/storage/CMakeLists.txt | 9 +- src/storage/InternalStorageServiceHandler.h | 7 +- src/storage/StorageServer.cpp | 6 +- src/storage/StorageServer.h | 1 + src/storage/index/LookupProcessor.h | 3 + src/storage/kv/GetProcessor.h | 3 + src/storage/kv/PutProcessor.h | 4 +- src/storage/kv/RemoveProcessor.h | 3 + src/storage/test/CMakeLists.txt | 15 ++ src/storage/test/ChainAddEdgesTest.cpp | 2 + src/storage/test/ChainDeleteEdgesTest.cpp | 73 ++--- src/storage/test/ChainResumeEdgeTest.cpp | 174 ++++++------ src/storage/test/ChainTestUtils.h | 42 +-- src/storage/test/ChainUpdateEdgeTest.cpp | 29 +- .../ChainAddEdgesLocalProcessor.cpp | 77 +++--- .../transaction/ChainAddEdgesLocalProcessor.h | 14 +- .../ChainAddEdgesRemoteProcessor.cpp | 2 + src/storage/transaction/ChainBaseProcessor.h | 5 + .../ChainDeleteEdgesLocalProcessor.cpp | 42 +-- .../ChainDeleteEdgesLocalProcessor.h | 1 + .../ChainDeleteEdgesResumeProcessor.cpp | 1 + .../ChainDeleteEdgesResumeRemoteProcessor.cpp | 21 +- .../ChainDeleteEdgesResumeRemoteProcessor.h | 4 +- .../transaction/ChainProcessorFactory.cpp | 46 +++- .../transaction/ChainProcessorFactory.h | 10 +- ...=> ChainResumeAddDoublePrimeProcessor.cpp} | 34 ++- ...h => ChainResumeAddDoublePrimeProcessor.h} | 12 +- ...r.cpp => ChainResumeAddPrimeProcessor.cpp} | 13 +- ...essor.h => ChainResumeAddPrimeProcessor.h} | 12 +- .../transaction/ChainResumeProcessor.cpp | 68 ----- .../transaction/ChainResumeProcessor.h | 31 --- .../ChainResumeUpdateDoublePrimeProcessor.cpp | 81 ++++++ ...> ChainResumeUpdateDoublePrimeProcessor.h} | 10 +- ...pp => ChainResumeUpdatePrimeProcessor.cpp} | 25 +- ...or.h => ChainResumeUpdatePrimeProcessor.h} | 12 +- .../ChainUpdateEdgeLocalProcessor.cpp | 108 +++++--- .../ChainUpdateEdgeLocalProcessor.h | 11 +- .../ChainUpdateEdgeRemoteProcessor.cpp | 3 + .../ResumeUpdateRemoteProcessor.cpp | 61 ----- .../transaction/TransactionManager.cpp | 250 +++++++++--------- src/storage/transaction/TransactionManager.h | 115 ++++---- 45 files changed, 790 insertions(+), 672 deletions(-) rename src/storage/transaction/{ResumeAddEdgeRemoteProcessor.cpp => ChainResumeAddDoublePrimeProcessor.cpp} (55%) rename src/storage/transaction/{ResumeAddEdgeProcessor.h => ChainResumeAddDoublePrimeProcessor.h} (59%) rename src/storage/transaction/{ResumeAddEdgeProcessor.cpp => ChainResumeAddPrimeProcessor.cpp} (78%) rename src/storage/transaction/{ResumeAddEdgeRemoteProcessor.h => ChainResumeAddPrimeProcessor.h} (67%) delete mode 100644 src/storage/transaction/ChainResumeProcessor.cpp delete mode 100644 src/storage/transaction/ChainResumeProcessor.h create mode 100644 src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp rename src/storage/transaction/{ResumeUpdateRemoteProcessor.h => ChainResumeUpdateDoublePrimeProcessor.h} (67%) rename src/storage/transaction/{ResumeUpdateProcessor.cpp => ChainResumeUpdatePrimeProcessor.cpp} (63%) rename src/storage/transaction/{ResumeUpdateProcessor.h => ChainResumeUpdatePrimeProcessor.h} (67%) delete mode 100644 src/storage/transaction/ResumeUpdateRemoteProcessor.cpp diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 7c50f8ab120..58961d02c32 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -167,7 +167,6 @@ void MetaClient::heartBeatThreadFunc() { LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); return; } - // if MetaServer has some changes, refresh the localCache_ loadData(); loadCfg(); diff --git a/src/clients/storage/InternalStorageClient.cpp b/src/clients/storage/InternalStorageClient.cpp index ad2a7c58960..67e635c318b 100644 --- a/src/clients/storage/InternalStorageClient.cpp +++ b/src/clients/storage/InternalStorageClient.cpp @@ -22,6 +22,8 @@ ::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) { switch (stResp.status().code()) { case Status::Code::kLeaderChanged: return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; + case Status::Code::kError: + return nebula::cpp2::ErrorCode::E_RPC_FAILURE; default: LOG(ERROR) << "not impl error transform: code=" << static_cast(stResp.status().code()); @@ -69,8 +71,8 @@ void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedReq std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable { auto code = getErrorCode(t); + VLOG(1) << "chainUpdateEdge rpc: " << apache::thrift::util::enumNameSafe(code); if (code == ::nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); chainUpdateEdge(reversedRequest, termOfSrc, optVersion, std::move(p)); } else { p.setValue(code); @@ -108,7 +110,6 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq, std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable { auto code = getErrorCode(t); if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); chainAddEdges(directReq, termId, optVersion, std::move(p)); } else { p.setValue(code); @@ -165,7 +166,6 @@ void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req, std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable { auto code = getErrorCode(t); if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); chainDeleteEdges(req, txnId, termId, std::move(p)); } else { p.setValue(code); diff --git a/src/common/utils/MemoryLockWrapper.h b/src/common/utils/MemoryLockWrapper.h index cf0db2d807b..5378a6d0a0b 100644 --- a/src/common/utils/MemoryLockWrapper.h +++ b/src/common/utils/MemoryLockWrapper.h @@ -51,7 +51,7 @@ class MemoryLockGuard { } ~MemoryLockGuard() { - if (locked_) { + if (locked_ && autoUnlock_) { lock_->unlockBatch(keys_); } } @@ -71,15 +71,8 @@ class MemoryLockGuard { return *iter_; } - // this will manual set the lock to unlocked state - // which mean will not release all locks automatically - // please make sure you really know the side effect - void forceLock() { - locked_ = true; - } - - void forceUnlock() { - locked_ = false; + void setAutoUnlock(bool autoUnlock) { + autoUnlock_ = autoUnlock; } protected: @@ -87,6 +80,7 @@ class MemoryLockGuard { std::vector keys_; typename std::vector::iterator iter_; bool locked_{false}; + bool autoUnlock_{true}; }; } // namespace nebula diff --git a/src/mock/MockCluster.cpp b/src/mock/MockCluster.cpp index 0eb5407fa6f..c9d8fbaae62 100644 --- a/src/mock/MockCluster.cpp +++ b/src/mock/MockCluster.cpp @@ -213,6 +213,7 @@ void MockCluster::initStorageKV(const char* dataPath, txnMan_ = std::make_unique(storageEnv_.get()); storageEnv_->txnMan_ = txnMan_.get(); + txnMan_->start(); } void MockCluster::startStorage(HostAddr addr, diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 6a5ade90293..b1227f4bcd8 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -75,14 +75,13 @@ nebula_add_library( transaction/ConsistUtil.cpp transaction/ChainUpdateEdgeLocalProcessor.cpp transaction/ChainUpdateEdgeRemoteProcessor.cpp - transaction/ChainResumeProcessor.cpp transaction/ChainAddEdgesGroupProcessor.cpp transaction/ChainAddEdgesLocalProcessor.cpp transaction/ChainAddEdgesRemoteProcessor.cpp - transaction/ResumeAddEdgeProcessor.cpp - transaction/ResumeAddEdgeRemoteProcessor.cpp - transaction/ResumeUpdateProcessor.cpp - transaction/ResumeUpdateRemoteProcessor.cpp + transaction/ChainResumeAddPrimeProcessor.cpp + transaction/ChainResumeAddDoublePrimeProcessor.cpp + transaction/ChainResumeUpdatePrimeProcessor.cpp + transaction/ChainResumeUpdateDoublePrimeProcessor.cpp transaction/ChainProcessorFactory.cpp transaction/ChainDeleteEdgesGroupProcessor.cpp transaction/ChainDeleteEdgesLocalProcessor.cpp diff --git a/src/storage/InternalStorageServiceHandler.h b/src/storage/InternalStorageServiceHandler.h index 01407c3b204..10bb052ed05 100644 --- a/src/storage/InternalStorageServiceHandler.h +++ b/src/storage/InternalStorageServiceHandler.h @@ -22,13 +22,14 @@ class InternalStorageServiceHandler final : public cpp2::InternalStorageServiceS public: explicit InternalStorageServiceHandler(StorageEnv* env); - folly::Future future_chainAddEdges(const cpp2::ChainAddEdgesRequest& p_req); + folly::Future future_chainAddEdges( + const cpp2::ChainAddEdgesRequest& p_req) override; folly::Future future_chainUpdateEdge( - const cpp2::ChainUpdateEdgeRequest& p_req); + const cpp2::ChainUpdateEdgeRequest& p_req) override; folly::Future future_chainDeleteEdges( - const cpp2::ChainDeleteEdgesRequest& p_req); + const cpp2::ChainDeleteEdgesRequest& p_req) override; private: StorageEnv* env_{nullptr}; diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index eec3e5fff4a..71c4e34cd3a 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -159,6 +159,7 @@ int32_t StorageServer::getAdminStoreSeqId() { bool StorageServer::start() { ioThreadPool_ = std::make_shared(FLAGS_num_io_threads); + ioThreadPoolForMeta_ = std::make_shared(FLAGS_num_io_threads); #ifndef BUILD_STANDALONE const int32_t numWorkerThreads = FLAGS_num_worker_threads; #else @@ -183,7 +184,7 @@ bool StorageServer::start() { options.rootPath_ = boost::filesystem::current_path().string(); options.dataPaths_ = dataPaths_; - metaClient_ = std::make_unique(ioThreadPool_, metaAddrs_, options); + metaClient_ = std::make_unique(ioThreadPoolForMeta_, metaAddrs_, options); if (!metaClient_->waitForMetadReady()) { LOG(ERROR) << "waitForMetadReady error!"; return false; @@ -222,6 +223,8 @@ bool StorageServer::start() { env_->interClient_ = interClient_.get(); txnMan_ = std::make_unique(env_.get()); + txnMan_->monitorPoolStat(ioThreadPool_.get(), "ioThreadPool_"); + txnMan_->monitorPoolStat(ioThreadPoolForMeta_.get(), "ioThreadPoolForMeta_"); if (!txnMan_->start()) { LOG(ERROR) << "Start transaction manager failed!"; return false; @@ -397,6 +400,7 @@ void StorageServer::stop() { if (txnMan_) { txnMan_->stop(); + txnMan_->join(); } if (taskMgr_) { taskMgr_->shutdown(); diff --git a/src/storage/StorageServer.h b/src/storage/StorageServer.h index 3637f6f9091..f5d2948f51c 100644 --- a/src/storage/StorageServer.h +++ b/src/storage/StorageServer.h @@ -59,6 +59,7 @@ class StorageServer final { bool initWebService(); std::shared_ptr ioThreadPool_; + std::shared_ptr ioThreadPoolForMeta_; std::shared_ptr workers_; std::unique_ptr storageThread_; diff --git a/src/storage/index/LookupProcessor.h b/src/storage/index/LookupProcessor.h index 00d6f8f55fa..05012f5f560 100644 --- a/src/storage/index/LookupProcessor.h +++ b/src/storage/index/LookupProcessor.h @@ -43,6 +43,9 @@ class LookupProcessor : public BaseProcessor { folly::Executor* executor_{nullptr}; std::unique_ptr planContext_; std::unique_ptr context_; + /** + * @brief the final output + */ nebula::DataSet resultDataSet_; nebula::DataSet statsDataSet_; std::vector partResults_; diff --git a/src/storage/kv/GetProcessor.h b/src/storage/kv/GetProcessor.h index 7caa28d237e..a4fa6907223 100644 --- a/src/storage/kv/GetProcessor.h +++ b/src/storage/kv/GetProcessor.h @@ -14,6 +14,9 @@ namespace storage { extern ProcessorCounters kGetCounters; +/** + * @brief this is a simple get() interface when storage run in KV mode. + */ class GetProcessor : public BaseProcessor { public: static GetProcessor* instance(StorageEnv* env, diff --git a/src/storage/kv/PutProcessor.h b/src/storage/kv/PutProcessor.h index 101cc183097..7888abd64dd 100644 --- a/src/storage/kv/PutProcessor.h +++ b/src/storage/kv/PutProcessor.h @@ -13,7 +13,9 @@ namespace nebula { namespace storage { extern ProcessorCounters kPutCounters; - +/** + * @brief this is a simple put() interface when storage run in KV mode. + */ class PutProcessor : public BaseProcessor { public: static PutProcessor* instance(StorageEnv* env, diff --git a/src/storage/kv/RemoveProcessor.h b/src/storage/kv/RemoveProcessor.h index 59bab864e87..11dfc5febe2 100644 --- a/src/storage/kv/RemoveProcessor.h +++ b/src/storage/kv/RemoveProcessor.h @@ -14,6 +14,9 @@ namespace storage { extern ProcessorCounters kRemoveCounters; +/** + * @brief this is a simple remove() interface when storage run in KV mode. + */ class RemoveProcessor : public BaseProcessor { public: static RemoveProcessor* instance(StorageEnv* env, diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 38d59a24f93..64ec20e7ccf 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -725,6 +725,21 @@ nebula_add_test( gtest ) +nebula_add_test( + NAME + chain_delete_edge_test + SOURCES + ChainDeleteEdgesTest.cpp + OBJECTS + ${storage_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) + nebula_add_test( NAME index_test diff --git a/src/storage/test/ChainAddEdgesTest.cpp b/src/storage/test/ChainAddEdgesTest.cpp index 3881e0cc671..5888e659844 100644 --- a/src/storage/test/ChainAddEdgesTest.cpp +++ b/src/storage/test/ChainAddEdgesTest.cpp @@ -201,6 +201,8 @@ TEST(ChainAddEdgesTest, processRemoteTest) { } // namespace nebula int main(int argc, char** argv) { + FLAGS_trace_toss = true; + FLAGS_v = 1; testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, false); google::SetStderrLogging(google::INFO); diff --git a/src/storage/test/ChainDeleteEdgesTest.cpp b/src/storage/test/ChainDeleteEdgesTest.cpp index 932c895210d..8b525d88ef1 100644 --- a/src/storage/test/ChainDeleteEdgesTest.cpp +++ b/src/storage/test/ChainDeleteEdgesTest.cpp @@ -222,6 +222,9 @@ TEST(ChainDeleteEdgesTest, DISABLED_Test5) { delProc->rcProcessRemote = nebula::cpp2::ErrorCode::SUCCEEDED; delProc->rcProcessLocal = nebula::cpp2::ErrorCode::SUCCEEDED; + UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Run DeleteEdgesReq..."; auto futDel = delProc->getFuture(); delProc->process(delReq); @@ -231,16 +234,13 @@ TEST(ChainDeleteEdgesTest, DISABLED_Test5) { LOG(INFO) << "after del(), edge num = " << num; EXPECT_EQ(num, 167); - env->txnMan_->scanAll(); - auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); - // std::this_thread::sleep_for(std::chrono::milliseconds()); + for (PartitionID i = 1; i <= partNum; ++i) { + env->txnMan_->scanPrimes(mockSpaceId, i); + } + env->txnMan_->stop(); + env->txnMan_->join(); num = util.checkNumOfKey(env, mockSpaceId, edgeKeys); EXPECT_EQ(num, 0); - - delete iClient; } // add some edges, then delete all of them, not execute local commit @@ -277,6 +277,9 @@ TEST(ChainDeleteEdgesTest, Test6) { delProc->rcProcessRemote = nebula::cpp2::ErrorCode::SUCCEEDED; delProc->rcProcessLocal = nebula::cpp2::ErrorCode::SUCCEEDED; + UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Run DeleteEdgesReq..."; auto futDel = delProc->getFuture(); delProc->process(delReq); @@ -286,16 +289,18 @@ TEST(ChainDeleteEdgesTest, Test6) { LOG(INFO) << "after del(), edge num = " << num; EXPECT_EQ(num, 167); - env->txnMan_->scanAll(); - auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + for (PartitionID i = 1; i <= partNum; ++i) { + env->txnMan_->scanPrimes(mockSpaceId, i); + } + // ChainResumeProcessor resumeProc(env); + // resumeProc.process(); + std::this_thread::sleep_for(std::chrono::seconds(2)); + sleep(1); + env->txnMan_->stop(); + env->txnMan_->join(); + num = util.checkNumOfKey(env, mockSpaceId, edgeKeys); EXPECT_EQ(num, 0); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - - delete iClient; } // add some edges, delete one of them, rpc failure @@ -332,6 +337,9 @@ TEST(ChainDeleteEdgesTest, Test7) { auto delReq = delProc->makeDelRequest(addReq, limit); delProc->rcProcessRemote = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Run DeleteEdgesReq..."; auto futDel = delProc->getFuture(); delProc->process(delReq); @@ -341,20 +349,16 @@ TEST(ChainDeleteEdgesTest, Test7) { LOG(INFO) << "after del(), edge num = " << num; EXPECT_EQ(num, 166); - env->txnMan_->scanAll(); - auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); + LOG(INFO) << "after recover()"; + num = util.checkNumOfKey(env, mockSpaceId, edgeKeys); EXPECT_EQ(num, 166); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - - delete iClient; } -// add some edges, then one all of them, rpc failure +// add some edges, delete all, rpc failure TEST(ChainDeleteEdgesTest, Test8) { fs::TempDir rootPath("/tmp/DeleteEdgesTest.XXXXXX"); mock::MockCluster cluster; @@ -397,16 +401,18 @@ TEST(ChainDeleteEdgesTest, Test8) { LOG(INFO) << "after del(), edge num = " << num; EXPECT_EQ(num, 0); - env->txnMan_->scanAll(); - auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + // for (PartitionID i = 1; i <= partNum; ++i) { + // env->txnMan_->scanPrimes(mockSpaceId, i); + // } + UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + // ChainResumeProcessor resumeProc(env); + // resumeProc.process(); + + env->txnMan_->stop(); + env->txnMan_->join(); num = util.checkNumOfKey(env, mockSpaceId, edgeKeys); EXPECT_EQ(num, 0); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - - delete iClient; } } // namespace storage @@ -414,6 +420,7 @@ TEST(ChainDeleteEdgesTest, Test8) { int main(int argc, char** argv) { FLAGS_trace_toss = true; + FLAGS_v = 1; testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, false); diff --git a/src/storage/test/ChainResumeEdgeTest.cpp b/src/storage/test/ChainResumeEdgeTest.cpp index 9c985a8462d..716b8263672 100644 --- a/src/storage/test/ChainResumeEdgeTest.cpp +++ b/src/storage/test/ChainResumeEdgeTest.cpp @@ -20,7 +20,6 @@ #include "storage/test/TestUtils.h" #include "storage/transaction/ChainAddEdgesGroupProcessor.h" #include "storage/transaction/ChainAddEdgesLocalProcessor.h" -#include "storage/transaction/ChainResumeProcessor.h" #include "storage/transaction/ConsistUtil.h" namespace nebula { @@ -58,6 +57,9 @@ TEST(ChainResumeEdgesTest, resumeTest1) { LOG(INFO) << "Build AddEdgesRequest..."; cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, mockPartNum); + UPCLT iClient(FakeInternalStorageClient::instance(env)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + auto fut = proc->getFuture(); proc->process(req); auto resp = std::move(fut).get(); @@ -70,16 +72,12 @@ TEST(ChainResumeEdgesTest, resumeTest1) { env->txnMan_->scanPrimes(1, i); } - auto* iClient = FakeInternalStorageClient::instance(env); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_EQ(334, numOfKey(req, gTestUtil.genKey, env)); EXPECT_EQ(0, numOfKey(req, gTestUtil.genPrime, env)); EXPECT_EQ(0, numOfKey(req, gTestUtil.genDoublePrime, env)); - - delete iClient; } /** @@ -107,6 +105,9 @@ TEST(ChainResumeEdgesTest, resumeTest2) { LOG(INFO) << "Build AddEdgesRequest..."; cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, mockPartNum); + UPCLT iClient(FakeInternalStorageClient::instance(env, Code::E_UNKNOWN)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Test AddEdgesProcessor..."; auto fut = proc->getFuture(); proc->process(req); @@ -118,16 +119,16 @@ TEST(ChainResumeEdgesTest, resumeTest2) { EXPECT_EQ(334, numOfKey(req, util.genPrime, env)); EXPECT_EQ(0, numOfKey(req, util.genDoublePrime, env)); - auto* iClient = FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::E_UNKNOWN); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + for (int32_t i = 1; i <= mockPartNum; ++i) { + env->txnMan_->scanPrimes(1, i); + } + + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_EQ(0, numOfKey(req, util.genKey, env)); EXPECT_EQ(334, numOfKey(req, util.genPrime, env)); EXPECT_EQ(0, numOfKey(req, util.genDoublePrime, env)); - - delete iClient; } /** @@ -161,22 +162,21 @@ TEST(ChainResumeEdgesTest, resumeTest3) { EXPECT_EQ(0, numOfKey(req, util.genDoublePrime, env)); auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE; - auto* iClient = FakeInternalStorageClient::instance(env, error); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); + + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); for (auto i = 1; i <= mockPartNum; ++i) { env->txnMan_->scanPrimes(1, i); } - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); // none of really edge key should be inserted EXPECT_EQ(334, numOfKey(req, util.genKey, env)); EXPECT_EQ(0, numOfKey(req, util.genPrime, env)); EXPECT_EQ(334, numOfKey(req, util.genDoublePrime, env)); - - delete iClient; } /** @@ -204,28 +204,26 @@ TEST(ChainResumeEdgesTest, resumeTest4) { int partNum = 1; cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, partNum); + auto error = nebula::cpp2::ErrorCode::E_UNKNOWN; + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Test AddEdgesProcessor..."; auto fut = proc->getFuture(); proc->process(req); auto resp = std::move(fut).get(); EXPECT_EQ(0, resp.result.failed_parts.size()); - // ChainTestUtils util; EXPECT_EQ(334, numOfKey(req, gTestUtil.genKey, env)); EXPECT_EQ(0, numOfKey(req, gTestUtil.genPrime, env)); EXPECT_EQ(334, numOfKey(req, gTestUtil.genDoublePrime, env)); - auto error = nebula::cpp2::ErrorCode::E_UNKNOWN; - auto* iClient = FakeInternalStorageClient::instance(env, error); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_EQ(334, numOfKey(req, gTestUtil.genKey, env)); EXPECT_EQ(0, numOfKey(req, gTestUtil.genPrime, env)); EXPECT_EQ(334, numOfKey(req, gTestUtil.genDoublePrime, env)); - - delete iClient; } /** @@ -243,6 +241,10 @@ TEST(ChainResumeEdgesTest, resumeTest5) { proc->rcProcessRemote = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Build AddEdgesRequest..."; cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1); @@ -252,22 +254,17 @@ TEST(ChainResumeEdgesTest, resumeTest5) { auto resp = std::move(fut).get(); EXPECT_EQ(0, resp.result.failed_parts.size()); + env->txnMan_->stop(); + env->txnMan_->join(); + ChainTestUtils util; EXPECT_EQ(334, numOfKey(req, util.genKey, env)); EXPECT_EQ(0, numOfKey(req, util.genPrime, env)); EXPECT_EQ(334, numOfKey(req, util.genDoublePrime, env)); - auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE; - auto* iClient = FakeInternalStorageClient::instance(env, error); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); - EXPECT_EQ(334, numOfKey(req, util.genKey, env)); EXPECT_EQ(0, numOfKey(req, util.genPrime, env)); EXPECT_EQ(334, numOfKey(req, util.genDoublePrime, env)); - - delete iClient; } /** @@ -288,6 +285,9 @@ TEST(ChainResumeEdgesTest, resumeTest6) { LOG(INFO) << "Build AddEdgesRequest..."; cpp2::AddEdgesRequest req = mock::MockData::mockAddEdgesReq(false, 1); + UPCLT iClient(FakeInternalStorageClient::instance(env)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Test AddEdgesProcessor..."; auto fut = proc->getFuture(); proc->process(req); @@ -299,21 +299,16 @@ TEST(ChainResumeEdgesTest, resumeTest6) { EXPECT_EQ(0, numOfKey(req, util.genPrime, env)); EXPECT_EQ(334, numOfKey(req, util.genDoublePrime, env)); - auto* iClient = FakeInternalStorageClient::instance(env); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - for (auto i = 1; i <= mockPartNum; ++i) { env->txnMan_->scanPrimes(1, i); } - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_EQ(334, numOfKey(req, util.genKey, env)); EXPECT_EQ(0, numOfKey(req, util.genPrime, env)); EXPECT_EQ(0, numOfKey(req, util.genDoublePrime, env)); - - delete iClient; } // resume an update left prime, check resume succeeded @@ -342,27 +337,24 @@ TEST(ChainUpdateEdgeTest, resumeTest7) { LOG(INFO) << "addUnfinishedEdge()"; proc->wrapAddUnfinishedEdge(ResumeType::RESUME_CHAIN); auto resp = std::move(f).get(); + UPCLT iClient(FakeInternalStorageClient::instance(env)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); EXPECT_FALSE(helper.checkRequestUpdated(env, req)); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_TRUE(helper.primeExist(env, req)); EXPECT_FALSE(helper.doublePrimeExist(env, req)); - auto* iClient = FakeInternalStorageClient::instance(env); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - for (auto i = 1; i <= mockPartNum; ++i) { env->txnMan_->scanPrimes(1, i); } - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_FALSE(helper.doublePrimeExist(env, req)); - - delete iClient; } // resume an update left prime, resume failed @@ -389,23 +381,25 @@ TEST(ChainUpdateEdgeTest, resumeTest8) { proc->process(req); auto resp = std::move(f).get(); - // EXPECT_TRUE(helper.checkResp(req, resp)); + auto error = nebula::cpp2::ErrorCode::E_UNKNOWN; + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + + for (auto i = 1; i <= mockPartNum; ++i) { + env->txnMan_->scanPrimes(1, i); + } + + env->txnMan_->stop(); + env->txnMan_->join(); + EXPECT_FALSE(helper.checkRequestUpdated(env, req)); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_TRUE(helper.primeExist(env, req)); EXPECT_FALSE(helper.doublePrimeExist(env, req)); - auto* iClient = FakeInternalStorageClient::instance(env); - iClient->setErrorCode(Code::E_UNKNOWN); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); - EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_TRUE(helper.primeExist(env, req)); EXPECT_FALSE(helper.doublePrimeExist(env, req)); - - delete iClient; } // resume an update left prime, resume outdated @@ -433,23 +427,20 @@ TEST(ChainUpdateEdgeTest, resumeTest9) { proc->wrapAddUnfinishedEdge(ResumeType::RESUME_CHAIN); auto resp = std::move(f).get(); - // EXPECT_TRUE(helper.checkResp(req, resp)); - EXPECT_FALSE(helper.checkRequestUpdated(env, req)); - EXPECT_TRUE(helper.edgeExist(env, req)); - EXPECT_TRUE(helper.primeExist(env, req)); - EXPECT_FALSE(helper.doublePrimeExist(env, req)); + auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + + for (auto i = 1; i <= mockPartNum; ++i) { + env->txnMan_->scanPrimes(1, i); + } - auto* iClient = FakeInternalStorageClient::instance(env); - iClient->setErrorCode(Code::E_RPC_FAILURE); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_TRUE(helper.doublePrimeExist(env, req)); - - delete iClient; } // resume an update left prime, check resume succeeded @@ -461,18 +452,19 @@ TEST(ChainUpdateEdgeTest, resumeTest10) { auto mClient = MetaClientTestUpdater::makeDefault(); env->metaClient_ = mClient.get(); - // auto parts = cluster.getTotalParts(); auto parts = mockPartNum; EXPECT_TRUE(QueryTestUtils::mockEdgeData(env, parts, mockSpaceVidLen)); LOG(INFO) << "Test UpdateEdgeRequest..."; auto req = helper.makeDefaultRequest(); + UPCLT iClient(FakeInternalStorageClient::instance(env)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Fake Prime..."; auto* proc = new FakeChainUpdateProcessor(env); auto f = proc->getFuture(); proc->rcProcessRemote = Code::E_RPC_FAILURE; - // proc->rcProcessLocal = Code::SUCCEEDED; proc->process(req); auto resp = std::move(f).get(); @@ -481,16 +473,12 @@ TEST(ChainUpdateEdgeTest, resumeTest10) { EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_TRUE(helper.doublePrimeExist(env, req)); - auto* iClient = FakeInternalStorageClient::instance(env); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_FALSE(helper.doublePrimeExist(env, req)); - - delete iClient; } // resume an update left prime, resume failed @@ -509,11 +497,14 @@ TEST(ChainUpdateEdgeTest, resumeTest11) { LOG(INFO) << "Test UpdateEdgeRequest..."; auto req = helper.makeDefaultRequest(); + auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Fake Prime..."; auto* proc = new FakeChainUpdateProcessor(env); auto f = proc->getFuture(); proc->rcProcessRemote = Code::E_RPC_FAILURE; - // proc->rcProcessLocal = Code::SUCCEEDED; proc->process(req); auto resp = std::move(f).get(); @@ -522,17 +513,12 @@ TEST(ChainUpdateEdgeTest, resumeTest11) { EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_TRUE(helper.doublePrimeExist(env, req)); - auto* iClient = FakeInternalStorageClient::instance(env); - iClient->setErrorCode(Code::E_UNKNOWN); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_TRUE(helper.doublePrimeExist(env, req)); - - delete iClient; } // resume an update left prime, resume outdated @@ -551,11 +537,14 @@ TEST(ChainUpdateEdgeTest, resumeTest12) { LOG(INFO) << "Test UpdateEdgeRequest..."; auto req = helper.makeDefaultRequest(); + auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + UPCLT iClient(FakeInternalStorageClient::instance(env, error)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + LOG(INFO) << "Fake Prime..."; auto* proc = new FakeChainUpdateProcessor(env); auto f = proc->getFuture(); proc->rcProcessRemote = Code::E_RPC_FAILURE; - // proc->rcProcessLocal = Code::SUCCEEDED; proc->process(req); auto resp = std::move(f).get(); @@ -564,22 +553,19 @@ TEST(ChainUpdateEdgeTest, resumeTest12) { EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_TRUE(helper.doublePrimeExist(env, req)); - auto* iClient = FakeInternalStorageClient::instance(env); - iClient->setErrorCode(Code::E_RPC_FAILURE); - FakeInternalStorageClient::hookInternalStorageClient(env, iClient); - ChainResumeProcessor resumeProc(env); - resumeProc.process(); + env->txnMan_->stop(); + env->txnMan_->join(); EXPECT_TRUE(helper.edgeExist(env, req)); EXPECT_FALSE(helper.primeExist(env, req)); EXPECT_TRUE(helper.doublePrimeExist(env, req)); - - delete iClient; } } // namespace storage } // namespace nebula int main(int argc, char** argv) { + FLAGS_trace_toss = true; + FLAGS_v = 1; testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, false); google::SetStderrLogging(google::INFO); diff --git a/src/storage/test/ChainTestUtils.h b/src/storage/test/ChainTestUtils.h index 0fd04ca00ee..636e6ff1b4f 100644 --- a/src/storage/test/ChainTestUtils.h +++ b/src/storage/test/ChainTestUtils.h @@ -6,7 +6,7 @@ #pragma once #include "storage/CommonUtils.h" -#include "storage/transaction/ChainResumeProcessor.h" +#include "storage/transaction/ChainAddEdgesLocalProcessor.h" #include "storage/transaction/ChainUpdateEdgeLocalProcessor.h" #include "storage/transaction/ChainUpdateEdgeRemoteProcessor.h" @@ -199,7 +199,8 @@ class FakeChainUpdateProcessor : public ChainUpdateEdgeLocalProcessor { } folly::SemiFuture processLocal(Code code) override { - LOG(INFO) << "FakeChainUpdateEdgeProcessorA::" << __func__ << "()"; + LOG(INFO) << "FakeChainUpdateEdgeProcessorA::" << __func__ << "()" + << apache::thrift::util::enumNameSafe(*rcProcessRemote); if (rcProcessLocal) { LOG(INFO) << "processLocal() fake return " << apache::thrift::util::enumNameSafe(*rcProcessLocal); @@ -255,17 +256,20 @@ class MetaClientTestUpdater { meta::MetaClientOptions options; auto mClient = std::make_unique(exec, addrs, options); - mClient->localCache_[mockSpaceId] = std::make_shared(); + auto spSpaceInfoCache = std::make_shared(); + addLocalCache(*mClient, mockSpaceId, spSpaceInfoCache); + auto* pCache = getLocalCache(mClient.get(), mockSpaceId); + for (int i = 0; i != mockPartNum; ++i) { - mClient->localCache_[mockSpaceId]->termOfPartition_[i] = i; - auto ignoreItem = mClient->localCache_[mockSpaceId]->partsAlloc_[i]; + pCache->termOfPartition_[i] = i; + auto ignoreItem = pCache->partsAlloc_[i]; UNUSED(ignoreItem); } meta::cpp2::ColumnTypeDef type; type.type_ref() = nebula::cpp2::PropertyType::FIXED_STRING; type.type_length_ref() = 32; - mClient->localCache_[mockSpaceId]->spaceDesc_.vid_type_ref() = std::move(type); + pCache->spaceDesc_.vid_type_ref() = std::move(type); mClient->ready_ = true; return mClient; } @@ -328,12 +332,10 @@ class FakeInternalStorageClient : public InternalStorageClient { static FakeInternalStorageClient* instance(StorageEnv* env, Code fakeCode = Code::SUCCEEDED) { auto pool = std::make_shared(3); return new FakeInternalStorageClient(env, pool, fakeCode); - // static FakeInternalStorageClient client(env, pool, fakeCode); - // return &client; } static void hookInternalStorageClient(StorageEnv* env, InternalStorageClient* client) { - env->txnMan_->iClient_ = client; + env->interClient_ = client; } private: @@ -341,6 +343,8 @@ class FakeInternalStorageClient : public InternalStorageClient { Code code_{Code::SUCCEEDED}; }; +using UPCLT = std::unique_ptr; + struct ChainUpdateEdgeTestHelper { ChainUpdateEdgeTestHelper() { sEdgeType = std::to_string(std::abs(edgeType_)); @@ -524,25 +528,5 @@ struct ChainUpdateEdgeTestHelper { std::string sEdgeType; }; -// class ChainResumeProcessorTestHelper { -// public: -// explicit ChainResumeProcessorTestHelper(ChainResumeProcessor* proc) : proc_(proc) {} - -// void setAddEdgeProc(ChainAddEdgesLocalProcessor* proc) { -// proc_->addProc = proc; -// } - -// // setUpdProc -// void setUpdProc(ChainUpdateEdgeLocalProcessor* proc) { -// proc_->updProc = proc; -// } - -// std::string getTxnId() { -// return proc_->addProc->txnId_; -// } -// public: -// ChainResumeProcessor* proc_{nullptr}; -// }; - } // namespace storage } // namespace nebula diff --git a/src/storage/test/ChainUpdateEdgeTest.cpp b/src/storage/test/ChainUpdateEdgeTest.cpp index 6249dac0bdf..24bc29da7f4 100644 --- a/src/storage/test/ChainUpdateEdgeTest.cpp +++ b/src/storage/test/ChainUpdateEdgeTest.cpp @@ -21,7 +21,6 @@ #include "storage/test/TestUtils.h" #include "storage/transaction/ChainAddEdgesGroupProcessor.h" #include "storage/transaction/ChainAddEdgesLocalProcessor.h" -#include "storage/transaction/ChainResumeProcessor.h" #include "storage/transaction/ChainUpdateEdgeRemoteProcessor.h" #include "storage/transaction/ConsistUtil.h" @@ -32,6 +31,7 @@ namespace storage { constexpr int32_t mockSpaceId = 1; constexpr int32_t mockPartNum = 6; +constexpr int32_t fackTerm = 1; constexpr int32_t mockSpaceVidLen = 32; ChainTestUtils gTestUtil; @@ -44,14 +44,20 @@ TEST(ChainUpdateEdgeTest, updateTest1) { auto* env = cluster.storageEnv_.get(); auto mClient = MetaClientTestUpdater::makeDefault(); env->metaClient_ = mClient.get(); + MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm); + auto stPartsNum = env->metaClient_->partsNum(mockSpaceId); + if (stPartsNum.ok()) { + LOG(INFO) << "stPartsNum.value()=" << stPartsNum.value(); + } auto parts = cluster.getTotalParts(); + LOG(INFO) << "parts: " << parts; EXPECT_TRUE(QueryTestUtils::mockEdgeData(env, parts, mockSpaceVidLen)); LOG(INFO) << "Test updateTest1..."; auto req = helper.makeDefaultRequest(); - env->txnMan_->iClient_ = FakeInternalStorageClient::instance(env); + env->interClient_ = FakeInternalStorageClient::instance(env); auto reversedRequest = helper.reverseRequest(env, req); auto* proc = new FakeChainUpdateProcessor(env); @@ -60,6 +66,9 @@ TEST(ChainUpdateEdgeTest, updateTest1) { proc->process(req); auto resp = std::move(f).get(); + env->txnMan_->stop(); + env->txnMan_->join(); + EXPECT_TRUE(helper.checkResp2(resp)); EXPECT_TRUE(helper.checkRequestUpdated(env, req)); EXPECT_TRUE(helper.checkRequestUpdated(env, reversedRequest)); @@ -75,6 +84,7 @@ TEST(ChainUpdateEdgeTest, updateTest2) { auto* env = cluster.storageEnv_.get(); auto mClient = MetaClientTestUpdater::makeDefault(); env->metaClient_ = mClient.get(); + MetaClientTestUpdater::addPartTerm(env->metaClient_, mockSpaceId, mockPartNum, fackTerm); auto parts = cluster.getTotalParts(); EXPECT_TRUE(QueryTestUtils::mockEdgeData(env, parts, mockSpaceVidLen)); @@ -93,6 +103,9 @@ TEST(ChainUpdateEdgeTest, updateTest2) { proc->process(badRequest); auto resp = std::move(f).get(); + env->txnMan_->stop(); + env->txnMan_->join(); + EXPECT_EQ(1, (*resp.result_ref()).failed_parts.size()); EXPECT_FALSE(helper.checkResp2(resp)); EXPECT_FALSE(helper.edgeExist(env, badRequest)); @@ -124,6 +137,9 @@ TEST(ChainUpdateEdgeTest, updateTest3) { proc->process(goodRequest); auto resp = std::move(f).get(); + env->txnMan_->stop(); + env->txnMan_->join(); + EXPECT_TRUE(helper.edgeExist(env, goodRequest)); EXPECT_TRUE(helper.primeExist(env, goodRequest)); EXPECT_FALSE(helper.doublePrimeExist(env, goodRequest)); @@ -146,15 +162,22 @@ TEST(ChainUpdateEdgeTest, updateTest4) { EXPECT_FALSE(helper.primeExist(env, goodRequest)); EXPECT_FALSE(helper.doublePrimeExist(env, goodRequest)); + UPCLT iClient(FakeInternalStorageClient::instance(env, nebula::cpp2::ErrorCode::SUCCEEDED)); + FakeInternalStorageClient::hookInternalStorageClient(env, iClient.get()); + auto* proc = new FakeChainUpdateProcessor(env); auto f = proc->getFuture(); proc->rcProcessRemote = Code::E_RPC_FAILURE; proc->process(goodRequest); auto resp = std::move(f).get(); + sleep(1); + env->txnMan_->stop(); + env->txnMan_->join(); + EXPECT_TRUE(helper.edgeExist(env, goodRequest)); EXPECT_FALSE(helper.primeExist(env, goodRequest)); - EXPECT_TRUE(helper.doublePrimeExist(env, goodRequest)); + EXPECT_FALSE(helper.doublePrimeExist(env, goodRequest)); } } // namespace storage diff --git a/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp b/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp index 1e58236c370..a93dc2ed8a0 100644 --- a/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp +++ b/src/storage/transaction/ChainAddEdgesLocalProcessor.cpp @@ -7,6 +7,8 @@ #include +#include + #include "common/utils/DefaultValueContext.h" #include "kvstore/Part.h" #include "storage/StorageFlags.h" @@ -87,9 +89,7 @@ folly::SemiFuture ChainAddEdgesLocalProcessor::processRemote(Code code) { } folly::SemiFuture ChainAddEdgesLocalProcessor::processLocal(Code code) { - if (FLAGS_trace_toss) { - VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code); - } + VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code); bool remoteFailed{true}; @@ -103,7 +103,7 @@ folly::SemiFuture ChainAddEdgesLocalProcessor::processLocal(Code code) { code_ = code; } - auto currTerm = env_->txnMan_->getTerm(spaceId_, localPartId_); + auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm.first != term_) { LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; code_ = Code::E_LEADER_CHANGED; @@ -111,7 +111,7 @@ folly::SemiFuture ChainAddEdgesLocalProcessor::processLocal(Code code) { if (code == Code::E_RPC_FAILURE) { kvAppend_ = makeDoublePrime(); - addUnfinishedEdge(ResumeType::RESUME_REMOTE); + rcRemote_ = Code::E_RPC_FAILURE; } if (code_ == Code::SUCCEEDED) { @@ -125,13 +125,13 @@ folly::SemiFuture ChainAddEdgesLocalProcessor::processLocal(Code code) { return code_; } -void ChainAddEdgesLocalProcessor::addUnfinishedEdge(ResumeType type) { +void ChainAddEdgesLocalProcessor::reportFailed(ResumeType type) { if (lk_ != nullptr) { - lk_->forceUnlock(); + lk_->setAutoUnlock(false); } auto keys = toStrKeys(req_); for (auto& key : keys) { - env_->txnMan_->addPrime(spaceId_, key, type); + env_->txnMan_->addPrime(spaceId_, localPartId_, term_, key, type); } } @@ -149,7 +149,7 @@ bool ChainAddEdgesLocalProcessor::prepareRequest(const cpp2::AddEdgesRequest& re localPartId_ = req.get_parts().begin()->first; replaceNullWithDefaultValue(req_); - std::tie(term_, code_) = env_->txnMan_->getTerm(spaceId_, localPartId_); + std::tie(term_, code_) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (code_ != Code::SUCCEEDED) { LOG(INFO) << "get term failed"; return false; @@ -174,26 +174,14 @@ folly::SemiFuture ChainAddEdgesLocalProcessor::forwardToDelegateProcessor( auto futProc = proc->getFuture(); auto [pro, fut] = folly::makePromiseContract(); std::move(futProc).thenTry([&, p = std::move(pro)](auto&& t) mutable { - auto rc = Code::SUCCEEDED; if (t.hasException()) { LOG(INFO) << "catch ex: " << t.exception().what(); - rc = Code::E_UNKNOWN; + rcCommit_ = Code::E_UNKNOWN; } else { auto& resp = t.value(); - rc = extractRpcError(resp); - if (rc == Code::SUCCEEDED) { - if (FLAGS_trace_toss) { - for (auto& k : kvErased_) { - VLOG(1) << uuid_ << " erase prime " << folly::hexlify(k); - } - } - } else { - VLOG(1) << uuid_ << " forwardToDelegateProcessor(), code = " - << apache::thrift::util::enumNameSafe(rc); - addUnfinishedEdge(ResumeType::RESUME_CHAIN); - } + rcCommit_ = extractRpcError(resp); } - p.setValue(rc); + p.setValue(rcCommit_); }); proc->process(req_); return std::move(fut); @@ -215,7 +203,8 @@ void ChainAddEdgesLocalProcessor::doRpc(folly::Promise&& promise, promise.setValue(Code::E_LEADER_CHANGED); return; } - auto* iClient = env_->txnMan_->getInternalClient(); + auto* iClient = env_->interClient_; + // auto* evb = env_->txnMan_->getEventBase(); folly::Promise p; auto f = p.getFuture(); iClient->chainAddEdges(req, term_, edgeVer_, std::move(p)); @@ -266,15 +255,16 @@ folly::SemiFuture ChainAddEdgesLocalProcessor::abort() { std::move(kvErased_), [p = std::move(pro), debugErased, this](auto rc) mutable { VLOG(1) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc); - if (rc == Code::SUCCEEDED) { - if (FLAGS_trace_toss) { - for (auto& k : debugErased) { - VLOG(1) << uuid_ << "erase prime " << folly::hexlify(k); - } - } - } else { - addUnfinishedEdge(ResumeType::RESUME_CHAIN); - } + this->rcCommit_ = rc; + // if (rc == Code::SUCCEEDED) { + // if (FLAGS_trace_toss) { + // for (auto& k : debugErased) { + // VLOG(1) << uuid_ << "erase prime " << folly::hexlify(k); + // } + // } + // } else { + // reportFailed(ResumeType::RESUME_CHAIN); + // } p.setValue(rc); }); return std::move(fut); @@ -322,8 +312,8 @@ void ChainAddEdgesLocalProcessor::erasePrime() { bool ChainAddEdgesLocalProcessor::lockEdges(const cpp2::AddEdgesRequest& req) { auto partId = req.get_parts().begin()->first; - auto* lockCore = env_->txnMan_->getLockCore(req.get_space_id(), partId); - if (!lockCore) { + lkCore_ = env_->txnMan_->getLockCore(req.get_space_id(), partId, term_); + if (!lkCore_) { return false; } @@ -331,7 +321,7 @@ bool ChainAddEdgesLocalProcessor::lockEdges(const cpp2::AddEdgesRequest& req) { for (auto& edge : req.get_parts().begin()->second) { keys.emplace_back(ConsistUtil::edgeKey(spaceVidLen_, partId, edge.get_key())); } - lk_ = std::make_unique(lockCore, keys); + lk_ = std::make_unique(lkCore_.get(), keys); return lk_->isLocked(); } @@ -365,6 +355,19 @@ cpp2::AddEdgesRequest ChainAddEdgesLocalProcessor::reverseRequest( void ChainAddEdgesLocalProcessor::finish() { VLOG(1) << uuid_ << " commitLocal(), code_ = " << apache::thrift::util::enumNameSafe(code_); pushResultCode(code_, localPartId_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ != Code::SUCCEEDED) { + reportFailed(ResumeType::RESUME_CHAIN); + } else if (rcRemote_ == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); + } else { + // lk_->setAutoUnlock(true); + } + } else { + // transaction manager will do the clean. + } finished_.setValue(code_); onFinished(); } diff --git a/src/storage/transaction/ChainAddEdgesLocalProcessor.h b/src/storage/transaction/ChainAddEdgesLocalProcessor.h index 06695e29677..1cfc4b3272d 100644 --- a/src/storage/transaction/ChainAddEdgesLocalProcessor.h +++ b/src/storage/transaction/ChainAddEdgesLocalProcessor.h @@ -103,7 +103,7 @@ class ChainAddEdgesLocalProcessor : public BaseProcessor, /// call this to leave a record in transaction manager /// the record can be scanned by the background resume thread /// then will do fail over logic - void addUnfinishedEdge(ResumeType type); + void reportFailed(ResumeType type); /*** consider the following case: * @@ -119,6 +119,17 @@ class ChainAddEdgesLocalProcessor : public BaseProcessor, * */ void replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req); + /** + * @brief check is an error code belongs to kv store + * we can do retry / recover if we meet a kv store error + * but if we meet a logical error (retry will alwasy failed) + * we should return error directly. + * @param code + * @return true + * @return false + */ + bool isKVStoreError(Code code); + std::string makeReadableEdge(const cpp2::AddEdgesRequest& req); int64_t toInt(const ::nebula::Value& val); @@ -128,6 +139,7 @@ class ChainAddEdgesLocalProcessor : public BaseProcessor, PartitionID localPartId_; PartitionID remotePartId_; cpp2::AddEdgesRequest req_; + TransactionManager::SPtrLock lkCore_; std::unique_ptr lk_{nullptr}; int retryLimit_{10}; // term at prepareLocal, not allowed to change during execution diff --git a/src/storage/transaction/ChainAddEdgesRemoteProcessor.cpp b/src/storage/transaction/ChainAddEdgesRemoteProcessor.cpp index 94dfce48417..64691a2381d 100644 --- a/src/storage/transaction/ChainAddEdgesRemoteProcessor.cpp +++ b/src/storage/transaction/ChainAddEdgesRemoteProcessor.cpp @@ -5,6 +5,8 @@ #include "storage/transaction/ChainAddEdgesRemoteProcessor.h" +#include + #include "storage/mutate/AddEdgesProcessor.h" #include "storage/transaction/ConsistUtil.h" #include "storage/transaction/TransactionManager.h" diff --git a/src/storage/transaction/ChainBaseProcessor.h b/src/storage/transaction/ChainBaseProcessor.h index c20f7dc6e84..a3371688a10 100644 --- a/src/storage/transaction/ChainBaseProcessor.h +++ b/src/storage/transaction/ChainBaseProcessor.h @@ -20,6 +20,8 @@ using Code = ::nebula::cpp2::ErrorCode; * */ class ChainBaseProcessor { + friend class ChainProcessorFactory; + public: virtual ~ChainBaseProcessor() = default; @@ -50,6 +52,9 @@ class ChainBaseProcessor { protected: Code code_ = Code::SUCCEEDED; + Code rcRemote_ = Code::SUCCEEDED; + Code rcCommit_ = Code::SUCCEEDED; + TermID term_; folly::Promise finished_; }; diff --git a/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp b/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp index f9c9e1951b8..352dd55b791 100644 --- a/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp +++ b/src/storage/transaction/ChainDeleteEdgesLocalProcessor.cpp @@ -79,20 +79,20 @@ folly::SemiFuture ChainDeleteEdgesLocalProcessor::processLocal(Code code) remoteFailed = true; } - auto [currTerm, suc] = env_->txnMan_->getTerm(spaceId_, localPartId_); + auto [currTerm, suc] = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm != term_) { LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; code_ = Code::E_LEADER_CHANGED; } if (code == Code::E_RPC_FAILURE) { + rcRemote_ = Code::E_RPC_FAILURE; for (auto& kv : primes_) { auto key = ConsistUtil::doublePrimeTable().append(kv.first.substr(ConsistUtil::primeTable().size())); setDoublePrime_ = true; doublePrimes_.emplace_back(key, kv.second); } - reportFailed(ResumeType::RESUME_REMOTE); } if (code_ == Code::SUCCEEDED) { @@ -108,13 +108,13 @@ folly::SemiFuture ChainDeleteEdgesLocalProcessor::processLocal(Code code) void ChainDeleteEdgesLocalProcessor::reportFailed(ResumeType type) { if (lk_ != nullptr) { - lk_->forceUnlock(); + lk_->setAutoUnlock(false); } for (auto& edgesOfPart : req_.get_parts()) { auto partId = edgesOfPart.first; for (auto& key : edgesOfPart.second) { auto strKey = ConsistUtil::edgeKey(spaceVidLen_, partId, key); - env_->txnMan_->addPrime(spaceId_, strKey, type); + env_->txnMan_->addPrime(spaceId_, localPartId_, term_, strKey, type); } } } @@ -200,11 +200,7 @@ folly::SemiFuture ChainDeleteEdgesLocalProcessor::commitLocal() { std::move(futProc).thenValue([&, p = std::move(pro)](auto&& resp) mutable { auto rc = ConsistUtil::getErrorCode(resp); VLOG(1) << txnId_ << " commitLocal() " << apache::thrift::util::enumNameSafe(rc); - if (rc == Code::SUCCEEDED) { - // do nothing - } else { - reportFailed(ResumeType::RESUME_CHAIN); - } + rcCommit_ = rc; p.setValue(rc); }); proc->process(req_); @@ -218,7 +214,8 @@ void ChainDeleteEdgesLocalProcessor::doRpc(folly::Promise&& promise, promise.setValue(Code::E_LEADER_CHANGED); return; } - auto* iClient = env_->txnMan_->getInternalClient(); + auto* iClient = env_->interClient_; + // auto* evb = env_->txnMan_->getEventBase(); folly::Promise p; auto f = p.getFuture(); iClient->chainDeleteEdges(req, txnId_, term_, std::move(p)); @@ -305,19 +302,15 @@ folly::SemiFuture ChainDeleteEdgesLocalProcessor::abort() { std::move(keyRemoved), [p = std::move(pro), this](auto rc) mutable { VLOG(1) << txnId_ << " abort()=" << apache::thrift::util::enumNameSafe(rc); - if (rc == Code::SUCCEEDED) { - // do nothing - } else { - reportFailed(ResumeType::RESUME_CHAIN); - } + rcCommit_ = rc; p.setValue(rc); }); return std::move(fut); } bool ChainDeleteEdgesLocalProcessor::lockEdges(const cpp2::DeleteEdgesRequest& req) { - auto* lockCore = env_->txnMan_->getLockCore(req.get_space_id(), localPartId_); - if (!lockCore) { + lkCore_ = env_->txnMan_->getLockCore(req.get_space_id(), localPartId_, term_); + if (!lkCore_) { VLOG(1) << txnId_ << "get lock failed."; return false; } @@ -328,7 +321,7 @@ bool ChainDeleteEdgesLocalProcessor::lockEdges(const cpp2::DeleteEdgesRequest& r keys.emplace_back(std::move(eKey)); } bool dedup = true; - lk_ = std::make_unique(lockCore, keys, dedup); + lk_ = std::make_unique(lkCore_.get(), keys, dedup); if (!lk_->isLocked()) { VLOG(1) << txnId_ << " conflict " << ConsistUtil::readableKey(spaceVidLen_, lk_->conflictKey()); } @@ -352,6 +345,19 @@ cpp2::DeleteEdgesRequest ChainDeleteEdgesLocalProcessor::reverseRequest( void ChainDeleteEdgesLocalProcessor::finish() { VLOG(1) << txnId_ << " commitLocal(), code_ = " << apache::thrift::util::enumNameSafe(code_); pushResultCode(code_, localPartId_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ != Code::SUCCEEDED) { + reportFailed(ResumeType::RESUME_CHAIN); + } else if (rcRemote_ == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); + } else { + // lk_->setAutoUnlock(true); + } + } else { + // transaction manager will do the clean. + } finished_.setValue(code_); onFinished(); } diff --git a/src/storage/transaction/ChainDeleteEdgesLocalProcessor.h b/src/storage/transaction/ChainDeleteEdgesLocalProcessor.h index 2c4f467b3d3..dd5def3719c 100644 --- a/src/storage/transaction/ChainDeleteEdgesLocalProcessor.h +++ b/src/storage/transaction/ChainDeleteEdgesLocalProcessor.h @@ -86,6 +86,7 @@ class ChainDeleteEdgesLocalProcessor : public BaseProcessor, PartitionID localPartId_; PartitionID remotePartId_; cpp2::DeleteEdgesRequest req_; + TransactionManager::SPtrLock lkCore_; std::unique_ptr lk_{nullptr}; int retryLimit_{10}; /** diff --git a/src/storage/transaction/ChainDeleteEdgesResumeProcessor.cpp b/src/storage/transaction/ChainDeleteEdgesResumeProcessor.cpp index 19698798cff..2da6bc00e8f 100644 --- a/src/storage/transaction/ChainDeleteEdgesResumeProcessor.cpp +++ b/src/storage/transaction/ChainDeleteEdgesResumeProcessor.cpp @@ -35,6 +35,7 @@ folly::SemiFuture ChainDeleteEdgesResumeProcessor::processRemote(Code code folly::SemiFuture ChainDeleteEdgesResumeProcessor::processLocal(Code code) { VLOG(1) << txnId_ << " processRemote() " << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; setErrorCode(code); if (code == Code::E_RPC_FAILURE) { diff --git a/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp b/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp index a0e0cdbc84f..86082c51e65 100644 --- a/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp +++ b/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp @@ -29,7 +29,7 @@ folly::SemiFuture ChainDeleteEdgesResumeRemoteProcessor::processRemote(Cod folly::SemiFuture ChainDeleteEdgesResumeRemoteProcessor::processLocal(Code code) { VLOG(1) << txnId_ << " processRemote() " << apache::thrift::util::enumNameSafe(code); - + rcRemote_ = code; setErrorCode(code); if (code == Code::E_RPC_FAILURE) { @@ -52,6 +52,7 @@ folly::SemiFuture ChainDeleteEdgesResumeRemoteProcessor::processLocal(Code folly::Baton baton; env_->kvstore_->asyncMultiRemove( spaceId_, localPartId_, std::move(doublePrimeKeys), [this, &baton](auto&& rc) { + rcCommit_ = rc; this->code_ = rc; baton.post(); }); @@ -61,5 +62,23 @@ folly::SemiFuture ChainDeleteEdgesResumeRemoteProcessor::processLocal(Code return code_; } +void ChainDeleteEdgesResumeRemoteProcessor::finish() { + VLOG(1) << " commitLocal(), code_ = " << apache::thrift::util::enumNameSafe(code_); + pushResultCode(code_, localPartId_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) { + lk_->setAutoUnlock(true); + } else { + reportFailed(ResumeType::RESUME_REMOTE); + } + } else { + // transaction manager will do the clean. + } + finished_.setValue(code_); + onFinished(); +} + } // namespace storage } // namespace nebula diff --git a/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.h b/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.h index 31c091f5962..571b2ec8eb5 100644 --- a/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.h +++ b/src/storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.h @@ -16,13 +16,15 @@ class ChainDeleteEdgesResumeRemoteProcessor : public ChainDeleteEdgesLocalProces return new ChainDeleteEdgesResumeRemoteProcessor(env, val); } + virtual ~ChainDeleteEdgesResumeRemoteProcessor() = default; + folly::SemiFuture prepareLocal() override; folly::SemiFuture processRemote(nebula::cpp2::ErrorCode code) override; folly::SemiFuture processLocal(nebula::cpp2::ErrorCode code) override; - virtual ~ChainDeleteEdgesResumeRemoteProcessor() = default; + void finish(); protected: ChainDeleteEdgesResumeRemoteProcessor(StorageEnv* env, const std::string& val); diff --git a/src/storage/transaction/ChainProcessorFactory.cpp b/src/storage/transaction/ChainProcessorFactory.cpp index b8a60e60e4d..7133ef01cf3 100644 --- a/src/storage/transaction/ChainProcessorFactory.cpp +++ b/src/storage/transaction/ChainProcessorFactory.cpp @@ -7,16 +7,39 @@ #include "storage/transaction/ChainDeleteEdgesResumeProcessor.h" #include "storage/transaction/ChainDeleteEdgesResumeRemoteProcessor.h" +#include "storage/transaction/ChainResumeAddDoublePrimeProcessor.h" +#include "storage/transaction/ChainResumeAddPrimeProcessor.h" +#include "storage/transaction/ChainResumeUpdateDoublePrimeProcessor.h" +#include "storage/transaction/ChainResumeUpdatePrimeProcessor.h" #include "storage/transaction/ConsistUtil.h" -#include "storage/transaction/ResumeAddEdgeProcessor.h" -#include "storage/transaction/ResumeAddEdgeRemoteProcessor.h" -#include "storage/transaction/ResumeUpdateProcessor.h" -#include "storage/transaction/ResumeUpdateRemoteProcessor.h" namespace nebula { namespace storage { +ChainBaseProcessor* ChainProcessorFactory::make(StorageEnv* env, + GraphSpaceID spaceId, + TermID termId, + const std::string& edgeKey, + ResumeType type) { + auto partId = NebulaKeyUtils::getPart(edgeKey); + auto prefix = (type == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable() + : ConsistUtil::doublePrimeTable(); + auto key = prefix + edgeKey; + std::string val; + auto rc = env->kvstore_->get(spaceId, partId, key, &val); + VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId + << ", hex = " << folly::hexlify(edgeKey) + << ", rc = " << apache::thrift::util::enumNameSafe(rc); + if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) { + return nullptr; + } + + ResumeOptions opt(type, val); + return makeProcessor(env, termId, opt); +} + ChainBaseProcessor* ChainProcessorFactory::makeProcessor(StorageEnv* env, + TermID termId, const ResumeOptions& options) { ChainBaseProcessor* ret = nullptr; auto requestType = ConsistUtil::parseType(options.primeValue); @@ -24,11 +47,13 @@ ChainBaseProcessor* ChainProcessorFactory::makeProcessor(StorageEnv* env, case RequestType::INSERT: { switch (options.resumeType) { case ResumeType::RESUME_CHAIN: { - ret = ResumeAddEdgeProcessor::instance(env, options.primeValue); + VLOG(1) << "make ChainResumeAddPrimeProcessor"; + ret = ChainResumeAddPrimeProcessor::instance(env, options.primeValue); break; } case ResumeType::RESUME_REMOTE: { - ret = ResumeAddEdgeRemoteProcessor::instance(env, options.primeValue); + VLOG(1) << "make ChainResumeAddDoublePrimeProcessor"; + ret = ChainResumeAddDoublePrimeProcessor::instance(env, options.primeValue); break; } case ResumeType::UNKNOWN: { @@ -40,11 +65,13 @@ ChainBaseProcessor* ChainProcessorFactory::makeProcessor(StorageEnv* env, case RequestType::UPDATE: { switch (options.resumeType) { case ResumeType::RESUME_CHAIN: { - ret = ResumeUpdateProcessor::instance(env, options.primeValue); + VLOG(1) << "make ChainResumeUpdatePrimeProcessor"; + ret = ChainResumeUpdatePrimeProcessor::instance(env, options.primeValue); break; } case ResumeType::RESUME_REMOTE: { - ret = ResumeUpdateRemoteProcessor::instance(env, options.primeValue); + VLOG(1) << "make ChainResumeUpdateDoublePrimeProcessor"; + ret = ChainResumeUpdateDoublePrimeProcessor::instance(env, options.primeValue); break; } case ResumeType::UNKNOWN: { @@ -56,10 +83,12 @@ ChainBaseProcessor* ChainProcessorFactory::makeProcessor(StorageEnv* env, case RequestType::DELETE: { switch (options.resumeType) { case ResumeType::RESUME_CHAIN: { + VLOG(1) << "make ChainDeleteEdgesResumeProcessor"; ret = ChainDeleteEdgesResumeProcessor::instance(env, options.primeValue); break; } case ResumeType::RESUME_REMOTE: { + VLOG(1) << "make ChainDeleteEdgesResumeRemoteProcessor"; ret = ChainDeleteEdgesResumeRemoteProcessor::instance(env, options.primeValue); break; } @@ -73,6 +102,7 @@ ChainBaseProcessor* ChainProcessorFactory::makeProcessor(StorageEnv* env, LOG(FATAL) << "RequestType::UNKNOWN: not supposed run here"; } } + ret->term_ = termId; return ret; } diff --git a/src/storage/transaction/ChainProcessorFactory.h b/src/storage/transaction/ChainProcessorFactory.h index 6c1518199d1..05a062c0fda 100644 --- a/src/storage/transaction/ChainProcessorFactory.h +++ b/src/storage/transaction/ChainProcessorFactory.h @@ -14,7 +14,15 @@ namespace storage { class ChainProcessorFactory { public: - static ChainBaseProcessor* makeProcessor(StorageEnv* env, const ResumeOptions& options); + static ChainBaseProcessor* makeProcessor(StorageEnv* env, + TermID termId, + const ResumeOptions& options); + + static ChainBaseProcessor* make(StorageEnv* env, + GraphSpaceID spaceId, + TermID termId, + const std::string& edgeKey, + ResumeType type); }; } // namespace storage diff --git a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp b/src/storage/transaction/ChainResumeAddDoublePrimeProcessor.cpp similarity index 55% rename from src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp rename to src/storage/transaction/ChainResumeAddDoublePrimeProcessor.cpp index 21259f74afa..13dd2115074 100644 --- a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp +++ b/src/storage/transaction/ChainResumeAddDoublePrimeProcessor.cpp @@ -3,19 +3,20 @@ * This source code is licensed under Apache 2.0 License. */ -#include "storage/transaction/ResumeAddEdgeRemoteProcessor.h" +#include "storage/transaction/ChainResumeAddDoublePrimeProcessor.h" namespace nebula { namespace storage { -ResumeAddEdgeRemoteProcessor::ResumeAddEdgeRemoteProcessor(StorageEnv* env, const std::string& val) +ChainResumeAddDoublePrimeProcessor::ChainResumeAddDoublePrimeProcessor(StorageEnv* env, + const std::string& val) : ChainAddEdgesLocalProcessor(env) { req_ = ConsistUtil::parseAddRequest(val); ChainAddEdgesLocalProcessor::prepareRequest(req_); } -folly::SemiFuture ResumeAddEdgeRemoteProcessor::prepareLocal() { - std::tie(term_, code_) = env_->txnMan_->getTerm(spaceId_, localPartId_); +folly::SemiFuture ChainResumeAddDoublePrimeProcessor::prepareLocal() { + std::tie(term_, code_) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (code_ != Code::SUCCEEDED) { return code_; } @@ -32,12 +33,13 @@ folly::SemiFuture ResumeAddEdgeRemoteProcessor::prepare return Code::SUCCEEDED; } -folly::SemiFuture ResumeAddEdgeRemoteProcessor::processRemote(Code code) { +folly::SemiFuture ChainResumeAddDoublePrimeProcessor::processRemote(Code code) { return ChainAddEdgesLocalProcessor::processRemote(code); } -folly::SemiFuture ResumeAddEdgeRemoteProcessor::processLocal(Code code) { - auto currTerm = env_->txnMan_->getTerm(spaceId_, localPartId_); +folly::SemiFuture ChainResumeAddDoublePrimeProcessor::processLocal(Code code) { + rcRemote_ = code; + auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm.first != term_) { LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; code_ = Code::E_LEADER_CHANGED; @@ -65,5 +67,23 @@ folly::SemiFuture ResumeAddEdgeRemoteProcessor::processLocal(Code code) { return code; } +void ChainResumeAddDoublePrimeProcessor::finish() { + VLOG(1) << uuid_ << " commitLocal(), code_ = " << apache::thrift::util::enumNameSafe(code_); + pushResultCode(code_, localPartId_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) { + lk_->setAutoUnlock(true); + } else { + reportFailed(ResumeType::RESUME_REMOTE); + } + } else { + // transaction manager will do the clean. + } + finished_.setValue(code_); + onFinished(); +} + } // namespace storage } // namespace nebula diff --git a/src/storage/transaction/ResumeAddEdgeProcessor.h b/src/storage/transaction/ChainResumeAddDoublePrimeProcessor.h similarity index 59% rename from src/storage/transaction/ResumeAddEdgeProcessor.h rename to src/storage/transaction/ChainResumeAddDoublePrimeProcessor.h index 797bf7979aa..4eb918e2c99 100644 --- a/src/storage/transaction/ResumeAddEdgeProcessor.h +++ b/src/storage/transaction/ChainResumeAddDoublePrimeProcessor.h @@ -10,22 +10,24 @@ namespace nebula { namespace storage { -class ResumeAddEdgeProcessor : public ChainAddEdgesLocalProcessor { +class ChainResumeAddDoublePrimeProcessor : public ChainAddEdgesLocalProcessor { public: - static ResumeAddEdgeProcessor* instance(StorageEnv* env, const std::string& val) { - return new ResumeAddEdgeProcessor(env, val); + static ChainResumeAddDoublePrimeProcessor* instance(StorageEnv* env, const std::string& val) { + return new ChainResumeAddDoublePrimeProcessor(env, val); } + virtual ~ChainResumeAddDoublePrimeProcessor() = default; + folly::SemiFuture prepareLocal() override; folly::SemiFuture processRemote(nebula::cpp2::ErrorCode code) override; folly::SemiFuture processLocal(nebula::cpp2::ErrorCode code) override; - virtual ~ResumeAddEdgeProcessor() = default; + void finish() override; protected: - ResumeAddEdgeProcessor(StorageEnv* env, const std::string& val); + ChainResumeAddDoublePrimeProcessor(StorageEnv* env, const std::string& val); }; } // namespace storage diff --git a/src/storage/transaction/ResumeAddEdgeProcessor.cpp b/src/storage/transaction/ChainResumeAddPrimeProcessor.cpp similarity index 78% rename from src/storage/transaction/ResumeAddEdgeProcessor.cpp rename to src/storage/transaction/ChainResumeAddPrimeProcessor.cpp index 3ca1bfb18c5..11239e722e4 100644 --- a/src/storage/transaction/ResumeAddEdgeProcessor.cpp +++ b/src/storage/transaction/ChainResumeAddPrimeProcessor.cpp @@ -3,12 +3,12 @@ * This source code is licensed under Apache 2.0 License. */ -#include "storage/transaction/ResumeAddEdgeProcessor.h" +#include "storage/transaction/ChainResumeAddPrimeProcessor.h" namespace nebula { namespace storage { -ResumeAddEdgeProcessor::ResumeAddEdgeProcessor(StorageEnv* env, const std::string& val) +ChainResumeAddPrimeProcessor::ChainResumeAddPrimeProcessor(StorageEnv* env, const std::string& val) : ChainAddEdgesLocalProcessor(env) { req_ = ConsistUtil::parseAddRequest(val); @@ -18,7 +18,7 @@ ResumeAddEdgeProcessor::ResumeAddEdgeProcessor(StorageEnv* env, const std::strin ChainAddEdgesLocalProcessor::prepareRequest(req_); } -folly::SemiFuture ResumeAddEdgeProcessor::prepareLocal() { +folly::SemiFuture ChainResumeAddPrimeProcessor::prepareLocal() { if (code_ != Code::SUCCEEDED) { return code_; } @@ -36,16 +36,17 @@ folly::SemiFuture ResumeAddEdgeProcessor::prepareLocal( return code_; } -folly::SemiFuture ResumeAddEdgeProcessor::processRemote(Code code) { +folly::SemiFuture ChainResumeAddPrimeProcessor::processRemote(Code code) { VLOG(1) << uuid_ << " prepareLocal() " << apache::thrift::util::enumNameSafe(code); return ChainAddEdgesLocalProcessor::processRemote(code); } -folly::SemiFuture ResumeAddEdgeProcessor::processLocal(Code code) { +folly::SemiFuture ChainResumeAddPrimeProcessor::processLocal(Code code) { VLOG(1) << uuid_ << " processRemote() " << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; setErrorCode(code); - auto currTerm = env_->txnMan_->getTerm(spaceId_, localPartId_); + auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm.first != term_) { LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; code_ = Code::E_LEADER_CHANGED; diff --git a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.h b/src/storage/transaction/ChainResumeAddPrimeProcessor.h similarity index 67% rename from src/storage/transaction/ResumeAddEdgeRemoteProcessor.h rename to src/storage/transaction/ChainResumeAddPrimeProcessor.h index a9046814064..4f6251c3fb1 100644 --- a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.h +++ b/src/storage/transaction/ChainResumeAddPrimeProcessor.h @@ -10,22 +10,22 @@ namespace nebula { namespace storage { -class ResumeAddEdgeRemoteProcessor : public ChainAddEdgesLocalProcessor { +class ChainResumeAddPrimeProcessor : public ChainAddEdgesLocalProcessor { public: - static ResumeAddEdgeRemoteProcessor* instance(StorageEnv* env, const std::string& val) { - return new ResumeAddEdgeRemoteProcessor(env, val); + static ChainResumeAddPrimeProcessor* instance(StorageEnv* env, const std::string& val) { + return new ChainResumeAddPrimeProcessor(env, val); } + virtual ~ChainResumeAddPrimeProcessor() = default; + folly::SemiFuture prepareLocal() override; folly::SemiFuture processRemote(nebula::cpp2::ErrorCode code) override; folly::SemiFuture processLocal(nebula::cpp2::ErrorCode code) override; - virtual ~ResumeAddEdgeRemoteProcessor() = default; - protected: - ResumeAddEdgeRemoteProcessor(StorageEnv* env, const std::string& val); + ChainResumeAddPrimeProcessor(StorageEnv* env, const std::string& val); }; } // namespace storage diff --git a/src/storage/transaction/ChainResumeProcessor.cpp b/src/storage/transaction/ChainResumeProcessor.cpp deleted file mode 100644 index 4fad8f13749..00000000000 --- a/src/storage/transaction/ChainResumeProcessor.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "storage/transaction/ChainResumeProcessor.h" - -#include "storage/transaction/ChainAddEdgesLocalProcessor.h" -#include "storage/transaction/ChainProcessorFactory.h" -#include "storage/transaction/ChainUpdateEdgeLocalProcessor.h" -#include "storage/transaction/ConsistUtil.h" -#include "storage/transaction/TransactionManager.h" - -namespace nebula { -namespace storage { - -void ChainResumeProcessor::process() { - auto* table = env_->txnMan_->getDangleEdges(); - std::unique_ptr iter; - for (auto it = table->begin(); it != table->end(); ++it) { - auto spaceId = *reinterpret_cast(const_cast(it->first.c_str())); - auto edgeKey = std::string(it->first.c_str() + sizeof(GraphSpaceID), - it->first.size() - sizeof(GraphSpaceID)); - auto partId = NebulaKeyUtils::getPart(edgeKey); - auto prefix = (it->second == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable() - : ConsistUtil::doublePrimeTable(); - auto key = prefix + edgeKey; - std::string val; - auto rc = env_->kvstore_->get(spaceId, partId, key, &val); - VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId - << ", hex = " << folly::hexlify(edgeKey) - << ", rc = " << apache::thrift::util::enumNameSafe(rc); - if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { - // do nothing - } else if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - VLOG(1) << "kvstore->get() leader changed"; - auto getPart = env_->kvstore_->part(spaceId, partId); - if (nebula::ok(getPart) && !nebula::value(getPart)->isLeader()) { - // not leader any more, stop trying resume - env_->txnMan_->delPrime(spaceId, edgeKey); - } - continue; - } else if (rc == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { - // raft may rollback want we scanned. - env_->txnMan_->delPrime(spaceId, edgeKey); - } else { - LOG(WARNING) << "kvstore->get() failed, " << apache::thrift::util::enumNameSafe(rc); - continue; - } - - ResumeOptions opt(it->second, val); - auto* proc = ChainProcessorFactory::makeProcessor(env_, opt); - auto fut = proc->getFinished(); - env_->txnMan_->addChainTask(proc); - std::move(fut) - .thenValue([=](auto&& code) { - if (code == Code::SUCCEEDED) { - env_->txnMan_->delPrime(spaceId, edgeKey); - } else { - VLOG(1) << "recover failed: " << apache::thrift::util::enumNameSafe(rc); - } - }) - .get(); - } -} - -} // namespace storage -} // namespace nebula diff --git a/src/storage/transaction/ChainResumeProcessor.h b/src/storage/transaction/ChainResumeProcessor.h deleted file mode 100644 index ac3572e319f..00000000000 --- a/src/storage/transaction/ChainResumeProcessor.h +++ /dev/null @@ -1,31 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#pragma once - -#include "clients/storage/InternalStorageClient.h" -#include "common/utils/NebulaKeyUtils.h" -#include "storage/transaction/ChainAddEdgesLocalProcessor.h" -#include "storage/transaction/ChainBaseProcessor.h" -#include "storage/transaction/ChainUpdateEdgeLocalProcessor.h" -#include "storage/transaction/TransactionManager.h" - -namespace nebula { -namespace storage { - -class ChainResumeProcessor { - friend class ChainResumeProcessorTestHelper; - - public: - explicit ChainResumeProcessor(StorageEnv* env) : env_(env) {} - - void process(); - - private: - StorageEnv* env_{nullptr}; -}; - -} // namespace storage -} // namespace nebula diff --git a/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp b/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp new file mode 100644 index 00000000000..046cd21cc0c --- /dev/null +++ b/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.cpp @@ -0,0 +1,81 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/transaction/ChainResumeUpdateDoublePrimeProcessor.h" + +#include + +namespace nebula { +namespace storage { + +ChainResumeUpdateDoublePrimeProcessor::ChainResumeUpdateDoublePrimeProcessor(StorageEnv* env, + const std::string& val) + : ChainUpdateEdgeLocalProcessor(env) { + req_ = ConsistUtil::parseUpdateRequest(val); + ChainUpdateEdgeLocalProcessor::prepareRequest(req_); +} + +folly::SemiFuture ChainResumeUpdateDoublePrimeProcessor::prepareLocal() { + VLOG(1) << " prepareLocal()"; + std::tie(term_, code_) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + return code_; +} + +folly::SemiFuture ChainResumeUpdateDoublePrimeProcessor::processRemote(Code code) { + VLOG(1) << " prepareLocal(), code = " << apache::thrift::util::enumNameSafe(code); + return ChainUpdateEdgeLocalProcessor::processRemote(code); +} + +folly::SemiFuture ChainResumeUpdateDoublePrimeProcessor::processLocal(Code code) { + VLOG(1) << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; + setErrorCode(code); + + auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (currTerm.first != term_) { + LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; + code_ = Code::E_LEADER_CHANGED; + } + + if (code == Code::SUCCEEDED) { + // if there are something wrong other than rpc failure + // we need to keep the resume retry(by not remove those prime key) + auto key = ConsistUtil::doublePrime(spaceVidLen_, localPartId_, req_.get_edge_key()); + kvErased_.emplace_back(std::move(key)); + return commit(); + } else { + // we can't decide if the double prime should be deleted. + // so do nothing + } + + return code; +} + +void ChainResumeUpdateDoublePrimeProcessor::finish() { + VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(rcCommit_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) { + lk_->setAutoUnlock(true); + } else if (isKVStoreError(rcCommit_) || rcRemote_ == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); + } else { + // 1. rcCommit_ has some logical error (retry can't get any help) + // 2. rcRemote_ has some error other than RPC_FAILURE + // 2.1 we should do abort if this. + // 2.2 abort() should only have kv store error + lk_->setAutoUnlock(true); + } + } else { + // if term changed, transaction manager will do the clean. + } + pushResultCode(code_, req_.get_part_id()); + finished_.setValue(code_); + onFinished(); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/transaction/ResumeUpdateRemoteProcessor.h b/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.h similarity index 67% rename from src/storage/transaction/ResumeUpdateRemoteProcessor.h rename to src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.h index bb3171d061b..df4a6ba8bdf 100644 --- a/src/storage/transaction/ResumeUpdateRemoteProcessor.h +++ b/src/storage/transaction/ChainResumeUpdateDoublePrimeProcessor.h @@ -15,10 +15,10 @@ namespace storage { * if the TxnManager background resume thread found a prime key * it will create this processor to resume the complete update process */ -class ResumeUpdateRemoteProcessor : public ChainUpdateEdgeLocalProcessor { +class ChainResumeUpdateDoublePrimeProcessor : public ChainUpdateEdgeLocalProcessor { public: - static ResumeUpdateRemoteProcessor* instance(StorageEnv* env, const std::string& val) { - return new ResumeUpdateRemoteProcessor(env, val); + static ChainResumeUpdateDoublePrimeProcessor* instance(StorageEnv* env, const std::string& val) { + return new ChainResumeUpdateDoublePrimeProcessor(env, val); } folly::SemiFuture prepareLocal() override; @@ -29,10 +29,10 @@ class ResumeUpdateRemoteProcessor : public ChainUpdateEdgeLocalProcessor { void finish() override; - virtual ~ResumeUpdateRemoteProcessor() = default; + virtual ~ChainResumeUpdateDoublePrimeProcessor() = default; protected: - ResumeUpdateRemoteProcessor(StorageEnv* env, const std::string& val); + ChainResumeUpdateDoublePrimeProcessor(StorageEnv* env, const std::string& val); bool lockEdge(const cpp2::UpdateEdgeRequest& req); }; diff --git a/src/storage/transaction/ResumeUpdateProcessor.cpp b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp similarity index 63% rename from src/storage/transaction/ResumeUpdateProcessor.cpp rename to src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp index 075d0c10a2d..dbd59342a33 100644 --- a/src/storage/transaction/ResumeUpdateProcessor.cpp +++ b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.cpp @@ -3,7 +3,7 @@ * This source code is licensed under Apache 2.0 License. */ -#include "storage/transaction/ResumeUpdateProcessor.h" +#include "storage/transaction/ChainResumeUpdatePrimeProcessor.h" #include @@ -12,27 +12,30 @@ namespace nebula { namespace storage { -ResumeUpdateProcessor::ResumeUpdateProcessor(StorageEnv* env, const std::string& val) +ChainResumeUpdatePrimeProcessor::ChainResumeUpdatePrimeProcessor(StorageEnv* env, + const std::string& val) : ChainUpdateEdgeLocalProcessor(env) { req_ = ConsistUtil::parseUpdateRequest(val); ChainUpdateEdgeLocalProcessor::prepareRequest(req_); } -folly::SemiFuture ResumeUpdateProcessor::prepareLocal() { - std::tie(term_, code_) = env_->txnMan_->getTerm(spaceId_, localPartId_); +folly::SemiFuture ChainResumeUpdatePrimeProcessor::prepareLocal() { + VLOG(1) << " prepareLocal()"; + std::tie(term_, code_) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); return code_; } -folly::SemiFuture ResumeUpdateProcessor::processRemote(Code code) { +folly::SemiFuture ChainResumeUpdatePrimeProcessor::processRemote(Code code) { VLOG(1) << "prepareLocal()=" << apache::thrift::util::enumNameSafe(code); return ChainUpdateEdgeLocalProcessor::processRemote(code); } -folly::SemiFuture ResumeUpdateProcessor::processLocal(Code code) { +folly::SemiFuture ChainResumeUpdatePrimeProcessor::processLocal(Code code) { VLOG(1) << "processRemote()=" << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; setErrorCode(code); - auto currTerm = env_->txnMan_->getTerm(spaceId_, localPartId_); + auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm.first != term_) { LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; code_ = Code::E_LEADER_CHANGED; @@ -47,18 +50,12 @@ folly::SemiFuture ResumeUpdateProcessor::processLocal(Code code) { // we need to keep the resume retry(by not remove those prime key) auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, req_.get_edge_key()); kvErased_.emplace_back(std::move(key)); - forwardToDelegateProcessor(); + code_ = commit(); return code_; } return code; } -void ResumeUpdateProcessor::finish() { - VLOG(1) << "commitLocal()=" << apache::thrift::util::enumNameSafe(code_); - finished_.setValue(code_); - onFinished(); -} - } // namespace storage } // namespace nebula diff --git a/src/storage/transaction/ResumeUpdateProcessor.h b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.h similarity index 67% rename from src/storage/transaction/ResumeUpdateProcessor.h rename to src/storage/transaction/ChainResumeUpdatePrimeProcessor.h index 557e351b4ed..bf13906f5bc 100644 --- a/src/storage/transaction/ResumeUpdateProcessor.h +++ b/src/storage/transaction/ChainResumeUpdatePrimeProcessor.h @@ -15,10 +15,10 @@ namespace storage { * if the TxnManager background resume thread found a prime key * it will create this processor to resume the complete update process */ -class ResumeUpdateProcessor : public ChainUpdateEdgeLocalProcessor { +class ChainResumeUpdatePrimeProcessor : public ChainUpdateEdgeLocalProcessor { public: - static ResumeUpdateProcessor* instance(StorageEnv* env, const std::string& val) { - return new ResumeUpdateProcessor(env, val); + static ChainResumeUpdatePrimeProcessor* instance(StorageEnv* env, const std::string& val) { + return new ChainResumeUpdatePrimeProcessor(env, val); } folly::SemiFuture prepareLocal() override; @@ -27,12 +27,10 @@ class ResumeUpdateProcessor : public ChainUpdateEdgeLocalProcessor { folly::SemiFuture processLocal(nebula::cpp2::ErrorCode code) override; - void finish() override; - - virtual ~ResumeUpdateProcessor() = default; + virtual ~ChainResumeUpdatePrimeProcessor() = default; protected: - ResumeUpdateProcessor(StorageEnv* env, const std::string& val); + ChainResumeUpdatePrimeProcessor(StorageEnv* env, const std::string& val); bool lockEdge(); }; diff --git a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp index d2246ecb002..9a05e6a62ac 100644 --- a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp +++ b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.cpp @@ -34,7 +34,7 @@ bool ChainUpdateEdgeLocalProcessor::prepareRequest(const cpp2::UpdateEdgeRequest return false; } - std::tie(term_, code_) = env_->txnMan_->getTerm(spaceId_, localPartId_); + std::tie(term_, code_) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (code_ != Code::SUCCEEDED) { return false; } @@ -72,7 +72,7 @@ folly::SemiFuture ChainUpdateEdgeLocalProcessor::prepareLocal() { } folly::SemiFuture ChainUpdateEdgeLocalProcessor::processRemote(Code code) { - LOG(INFO) << "prepareLocal()=" << apache::thrift::util::enumNameSafe(code); + VLOG(1) << " prepareLocal(): " << apache::thrift::util::enumNameSafe(code_); if (code != Code::SUCCEEDED) { return code; } @@ -82,41 +82,74 @@ folly::SemiFuture ChainUpdateEdgeLocalProcessor::processRemote(Code code) } folly::SemiFuture ChainUpdateEdgeLocalProcessor::processLocal(Code code) { - LOG(INFO) << "processRemote(), code = " << apache::thrift::util::enumNameSafe(code); - if (code != Code::SUCCEEDED && code_ == Code::SUCCEEDED) { - code_ = code; - } + VLOG(1) << " processRemote(): " << apache::thrift::util::enumNameSafe(code); + rcRemote_ = code; - auto currTerm = env_->txnMan_->getTerm(spaceId_, localPartId_); + auto currTerm = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); if (currTerm.first != term_) { - LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; + VLOG(1) << "E_LEADER_CHANGED during prepare and commit local"; code_ = Code::E_LEADER_CHANGED; + return code_; } - if (code == Code::E_RPC_FAILURE) { - appendDoublePrime(); - addUnfinishedEdge(ResumeType::RESUME_REMOTE); - } - - if (code == Code::SUCCEEDED || code == Code::E_RPC_FAILURE) { + if (code == Code::SUCCEEDED) { erasePrime(); - forwardToDelegateProcessor(); + } else if (code == Code::E_RPC_FAILURE) { + erasePrime(); + appendDoublePrime(); } else { if (primeInserted_) { abort(); + return code_; + } + } + + rcCommit_ = commit(); + + if (code_ != Code::SUCCEEDED) { + reportFailed(ResumeType::RESUME_CHAIN); + } else { + if (code == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); } } return code_; } +void ChainUpdateEdgeLocalProcessor::finish() { + VLOG(1) << " commitLocal()=" << apache::thrift::util::enumNameSafe(rcCommit_); + TermID currTerm = 0; + std::tie(currTerm, std::ignore) = env_->txnMan_->getTermFromKVStore(spaceId_, localPartId_); + if (term_ == currTerm) { + if (rcCommit_ == Code::SUCCEEDED && rcRemote_ == Code::SUCCEEDED) { + // lk_->setAutoUnlock(true); + } else if (isKVStoreError(rcCommit_)) { + reportFailed(ResumeType::RESUME_CHAIN); + } else if (rcRemote_ == Code::E_RPC_FAILURE) { + reportFailed(ResumeType::RESUME_REMOTE); + } else { + // 1. rcCommit_ has some logical error (retry can't get any help) + // 2. rcRemote_ has some error other than RPC_FAILURE + // 2.1 we should do abort if this. + // 2.2 abort() should only have kv store error + // lk_->setAutoUnlock(true); + } + } else { + // if term changed, transaction manager will do the clean. + } + pushResultCode(code_, req_.get_part_id()); + finished_.setValue(code_); + onFinished(); +} + void ChainUpdateEdgeLocalProcessor::doRpc(folly::Promise&& promise, int retry) noexcept { try { if (retry > retryLimit_) { promise.setValue(Code::E_LEADER_CHANGED); return; } - auto* iClient = env_->txnMan_->getInternalClient(); + auto* iClient = env_->interClient_; folly::Promise p; auto reversedReq = reverseRequest(req_); @@ -155,7 +188,7 @@ void ChainUpdateEdgeLocalProcessor::appendDoublePrime() { kvAppend_.emplace_back(std::make_pair(std::move(key), std::move(val))); } -void ChainUpdateEdgeLocalProcessor::forwardToDelegateProcessor() { +Code ChainUpdateEdgeLocalProcessor::commit() { kUpdateEdgeCounters.init("update_edge"); UpdateEdgeProcessor::ContextAdjuster fn = [=](EdgeContext& ctx) { ctx.kvAppend = std::move(kvAppend_); @@ -166,12 +199,8 @@ void ChainUpdateEdgeLocalProcessor::forwardToDelegateProcessor() { proc->adjustContext(std::move(fn)); auto f = proc->getFuture(); proc->process(req_); - auto resp = std::move(f).get(); - code_ = getErrorCode(resp); - if (code_ != Code::SUCCEEDED) { - addUnfinishedEdge(ResumeType::RESUME_CHAIN); - } - std::swap(resp_, resp); + resp_ = std::move(f).get(); + return getErrorCode(resp_); } Code ChainUpdateEdgeLocalProcessor::checkAndBuildContexts(const cpp2::UpdateEdgeRequest&) { @@ -182,12 +211,6 @@ std::string ChainUpdateEdgeLocalProcessor::sEdgeKey(const cpp2::UpdateEdgeReques return ConsistUtil::edgeKey(spaceVidLen_, req.get_part_id(), req.get_edge_key()); } -void ChainUpdateEdgeLocalProcessor::finish() { - LOG(INFO) << "ChainUpdateEdgeLocalProcessor::finish()"; - pushResultCode(code_, req_.get_part_id()); - onFinished(); -} - void ChainUpdateEdgeLocalProcessor::abort() { auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, req_.get_edge_key()); kvErased_.emplace_back(std::move(key)); @@ -195,10 +218,8 @@ void ChainUpdateEdgeLocalProcessor::abort() { folly::Baton baton; env_->kvstore_->asyncMultiRemove( req_.get_space_id(), req_.get_part_id(), std::move(kvErased_), [&](auto rc) mutable { - LOG(INFO) << " abort()=" << apache::thrift::util::enumNameSafe(rc); - if (rc != Code::SUCCEEDED) { - addUnfinishedEdge(ResumeType::RESUME_CHAIN); - } + VLOG(1) << " abort()=" << apache::thrift::util::enumNameSafe(rc); + rcCommit_ = rc; baton.post(); }); baton.wait(); @@ -221,12 +242,12 @@ cpp2::UpdateEdgeRequest ChainUpdateEdgeLocalProcessor::reverseRequest( bool ChainUpdateEdgeLocalProcessor::setLock() { auto spaceId = req_.get_space_id(); - auto* lockCore = env_->txnMan_->getLockCore(spaceId, req_.get_part_id()); - if (lockCore == nullptr) { + lkCore_ = env_->txnMan_->getLockCore(spaceId, req_.get_part_id(), term_); + if (lkCore_ == nullptr) { return false; } auto key = ConsistUtil::edgeKey(spaceVidLen_, req_.get_part_id(), req_.get_edge_key()); - lk_ = std::make_unique>(lockCore, key); + lk_ = std::make_unique>(lkCore_.get(), key); return lk_->isLocked(); } @@ -240,13 +261,20 @@ nebula::cpp2::ErrorCode ChainUpdateEdgeLocalProcessor::getErrorCode( return parts.front().get_code(); } -void ChainUpdateEdgeLocalProcessor::addUnfinishedEdge(ResumeType type) { - LOG(INFO) << "addUnfinishedEdge()"; +void ChainUpdateEdgeLocalProcessor::reportFailed(ResumeType type) { + VLOG(1) << "reportFailed()"; if (lk_ != nullptr) { - lk_->forceUnlock(); + lk_->setAutoUnlock(false); } auto key = ConsistUtil::edgeKey(spaceVidLen_, req_.get_part_id(), req_.get_edge_key()); - env_->txnMan_->addPrime(spaceId_, key, type); + env_->txnMan_->addPrime(spaceId_, localPartId_, term_, key, type); +} + +bool ChainUpdateEdgeLocalProcessor::isKVStoreError(nebula::cpp2::ErrorCode code) { + auto iCode = static_cast(code); + auto kvStoreErrorCodeBegin = static_cast(nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART); + auto kvStoreErrorCodeEnd = static_cast(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + return iCode >= kvStoreErrorCodeBegin && iCode <= kvStoreErrorCodeEnd; } } // namespace storage diff --git a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h index 2f84f343a83..0cc65d0b252 100644 --- a/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h +++ b/src/storage/transaction/ChainUpdateEdgeLocalProcessor.h @@ -49,8 +49,6 @@ class ChainUpdateEdgeLocalProcessor void doRpc(folly::Promise&& promise, int retry = 0) noexcept; - folly::SemiFuture processNormalLocal(Code code); - void abort(); bool prepareRequest(const cpp2::UpdateEdgeRequest& req); @@ -59,15 +57,15 @@ class ChainUpdateEdgeLocalProcessor void appendDoublePrime(); - void forwardToDelegateProcessor(); - std::string sEdgeKey(const cpp2::UpdateEdgeRequest& req); cpp2::UpdateEdgeRequest reverseRequest(const cpp2::UpdateEdgeRequest& req); bool setLock(); - void addUnfinishedEdge(ResumeType type); + Code commit(); + + void reportFailed(ResumeType type); int64_t getVersion(const cpp2::UpdateEdgeRequest& req); @@ -75,8 +73,11 @@ class ChainUpdateEdgeLocalProcessor Code checkAndBuildContexts(const cpp2::UpdateEdgeRequest& req) override; + bool isKVStoreError(nebula::cpp2::ErrorCode code); + protected: cpp2::UpdateEdgeRequest req_; + TransactionManager::SPtrLock lkCore_; std::unique_ptr lk_; PartitionID localPartId_; int retryLimit_{10}; diff --git a/src/storage/transaction/ChainUpdateEdgeRemoteProcessor.cpp b/src/storage/transaction/ChainUpdateEdgeRemoteProcessor.cpp index 1dbfb1daeaf..8145f9931d0 100644 --- a/src/storage/transaction/ChainUpdateEdgeRemoteProcessor.cpp +++ b/src/storage/transaction/ChainUpdateEdgeRemoteProcessor.cpp @@ -15,6 +15,7 @@ namespace storage { using Code = ::nebula::cpp2::ErrorCode; void ChainUpdateEdgeRemoteProcessor::process(const cpp2::ChainUpdateEdgeRequest& req) { + VLOG(1) << "process()"; auto rc = Code::SUCCEEDED; auto spaceId = req.get_space_id(); auto localPartId = getLocalPart(req); @@ -28,7 +29,9 @@ void ChainUpdateEdgeRemoteProcessor::process(const cpp2::ChainUpdateEdgeRequest& if (rc != Code::SUCCEEDED) { pushResultCode(rc, updateRequest.get_part_id()); } else { + VLOG(1) << "before updateEdge()"; updateEdge(req); + VLOG(1) << "after updateEdge()"; } onFinished(); } diff --git a/src/storage/transaction/ResumeUpdateRemoteProcessor.cpp b/src/storage/transaction/ResumeUpdateRemoteProcessor.cpp deleted file mode 100644 index 5bfa6ed2a65..00000000000 --- a/src/storage/transaction/ResumeUpdateRemoteProcessor.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "storage/transaction/ResumeUpdateRemoteProcessor.h" - -#include - -namespace nebula { -namespace storage { - -ResumeUpdateRemoteProcessor::ResumeUpdateRemoteProcessor(StorageEnv* env, const std::string& val) - : ChainUpdateEdgeLocalProcessor(env) { - req_ = ConsistUtil::parseUpdateRequest(val); - ChainUpdateEdgeLocalProcessor::prepareRequest(req_); -} - -folly::SemiFuture ResumeUpdateRemoteProcessor::prepareLocal() { - std::tie(term_, code_) = env_->txnMan_->getTerm(spaceId_, localPartId_); - return code_; -} - -folly::SemiFuture ResumeUpdateRemoteProcessor::processRemote(Code code) { - return ChainUpdateEdgeLocalProcessor::processRemote(code); -} - -folly::SemiFuture ResumeUpdateRemoteProcessor::processLocal(Code code) { - setErrorCode(code); - - auto currTerm = env_->txnMan_->getTerm(spaceId_, localPartId_); - if (currTerm.first != term_) { - LOG(WARNING) << "E_LEADER_CHANGED during prepare and commit local"; - code_ = Code::E_LEADER_CHANGED; - } - - if (code == Code::SUCCEEDED) { - // if there are something wrong other than rpc failure - // we need to keep the resume retry(by not remove those prime key) - auto key = ConsistUtil::doublePrime(spaceVidLen_, localPartId_, req_.get_edge_key()); - kvErased_.emplace_back(std::move(key)); - forwardToDelegateProcessor(); - return code; - } else { - // we can't decide if the double prime should be deleted. - // so do nothing - } - - return code; -} - -void ResumeUpdateRemoteProcessor::finish() { - if (FLAGS_trace_toss) { - VLOG(1) << "commitLocal()=" << apache::thrift::util::enumNameSafe(code_); - } - finished_.setValue(code_); - onFinished(); -} - -} // namespace storage -} // namespace nebula diff --git a/src/storage/transaction/TransactionManager.cpp b/src/storage/transaction/TransactionManager.cpp index 2c91b20b8de..09a4270961a 100644 --- a/src/storage/transaction/TransactionManager.cpp +++ b/src/storage/transaction/TransactionManager.cpp @@ -12,47 +12,100 @@ #include "kvstore/NebulaStore.h" #include "storage/CommonUtils.h" #include "storage/StorageFlags.h" -#include "storage/transaction/ChainResumeProcessor.h" +#include "storage/transaction/ChainProcessorFactory.h" namespace nebula { namespace storage { DEFINE_int32(resume_interval_secs, 10, "Resume interval"); -ProcessorCounters kForwardTranxCounters; - TransactionManager::TransactionManager(StorageEnv* env) : env_(env) { LOG(INFO) << "TransactionManager ctor()"; exec_ = std::make_shared(10); - iClient_ = env_->interClient_; - resumeThread_ = std::make_unique(); +} + +bool TransactionManager::start() { std::vector> existParts; auto fn = std::bind(&TransactionManager::onNewPartAdded, this, std::placeholders::_1); static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_) ->registerOnNewPartAdded("TransactionManager", fn, existParts); - for (auto& partOfSpace : existParts) { - scanPrimes(partOfSpace.first, partOfSpace.second); + for (auto&& [spaceId, partId] : existParts) { + auto [termId, rc] = getTermFromKVStore(spaceId, partId); + if (rc != Code::SUCCEEDED) { + continue; + } + scanPrimes(spaceId, partId, termId); } + monitorPoolStat(exec_.get(), "exec of Transaction Manager"); + statThread_ = std::thread(&TransactionManager::bgPrintPoolStat, this); + return true; +} + +void TransactionManager::monitorPoolStat(folly::ThreadPoolExecutor* pool, const std::string& msg) { + monPoolStats_.emplace_back(std::make_pair(pool, msg)); +} + +void TransactionManager::bgPrintPoolStat() { + while (!stop_) { + for (auto&& [pool, msg] : monPoolStats_) { + VLOG(1) << dumpPoolStat(pool, msg); + } + std::this_thread::sleep_for(std::chrono::seconds(20)); + } +} + +std::string TransactionManager::dumpPoolStat(folly::ThreadPoolExecutor* exec, + const std::string& msg) { + auto stats = exec->getPoolStats(); + std::stringstream oss; + oss << "\npoolStats: " << msg << "\n\t threadCount = " << stats.threadCount + << "\n\t idleThreadCount = " << stats.idleThreadCount + << "\n\t activeThreadCount = " << stats.activeThreadCount + << "\n\t pendingTaskCount = " << stats.pendingTaskCount + << "\n\t totalTaskCount = " << stats.totalTaskCount << "\n"; + return oss.str(); +} + +void TransactionManager::stop() { + LOG(INFO) << "TransactionManager stop()"; + stop_ = true; +} + +void TransactionManager::join() { + LOG(INFO) << "TransactionManager join()"; + statThread_.join(); + exec_->stop(); } -TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID spaceId, - GraphSpaceID partId, - bool checkWhiteList) { +void TransactionManager::addChainTask(ChainBaseProcessor* proc) { + folly::via(exec_.get()) + .thenValue([=](auto&&) { return proc->prepareLocal(); }) + .thenValue([=](auto&& code) { return proc->processRemote(code); }) + .thenValue([=](auto&& code) { return proc->processLocal(code); }) + .ensure([=]() { proc->finish(); }); +} + +TransactionManager::SPtrLock TransactionManager::getLockCore(GraphSpaceID spaceId, + GraphSpaceID partId, + TermID termId, + bool checkWhiteList) { if (checkWhiteList) { - if (scannedParts_.find(std::make_pair(spaceId, partId)) == scannedParts_.end()) { + if (currTerm_.find(std::make_pair(spaceId, partId)) == currTerm_.end()) { return nullptr; } } - auto it = memLocks_.find(spaceId); + MemLockKey key = std::make_tuple(spaceId, partId, termId); + auto it = memLocks_.find(key); if (it != memLocks_.end()) { - return it->second.get(); + return it->second; } - auto item = memLocks_.insert(spaceId, std::make_unique()); - return item.first->second.get(); + auto item = memLocks_.insert(key, std::make_shared()); + return item.first->second; } -std::pair TransactionManager::getTerm(GraphSpaceID spaceId, PartitionID partId) { +std::pair TransactionManager::getTermFromKVStore(GraphSpaceID spaceId, + PartitionID partId) { TermID termId = -1; auto rc = Code::SUCCEEDED; auto part = env_->kvstore_->part(spaceId, partId); @@ -67,13 +120,13 @@ std::pair TransactionManager::getTerm(GraphSpaceID spaceId, Partit bool TransactionManager::checkTermFromCache(GraphSpaceID spaceId, PartitionID partId, TermID termId) { - auto termOfMeta = env_->metaClient_->getTermFromCache(spaceId, partId); - if (termOfMeta.ok()) { - if (termId < termOfMeta.value()) { + auto termFromMeta = env_->metaClient_->getTermFromCache(spaceId, partId); + if (termFromMeta.ok()) { + if (termId < termFromMeta.value()) { LOG(WARNING) << "checkTerm() failed: " << "spaceId=" << spaceId << ", partId=" << partId << ", in-coming term=" << termId - << ", term in meta cache=" << termOfMeta.value(); + << ", term in meta cache=" << termFromMeta.value(); return false; } } @@ -89,75 +142,31 @@ bool TransactionManager::checkTermFromCache(GraphSpaceID spaceId, return true; } -void TransactionManager::resumeThread() { - SCOPE_EXIT { - resumeThread_->addDelayTask( - FLAGS_resume_interval_secs * 1000, &TransactionManager::resumeThread, this); - }; - ChainResumeProcessor proc(env_); - proc.process(); -} - -bool TransactionManager::start() { - if (!resumeThread_->start()) { - LOG(ERROR) << "resume thread start failed"; - return false; - } - resumeThread_->addDelayTask( - FLAGS_resume_interval_secs * 1000, &TransactionManager::resumeThread, this); - return true; -} - -void TransactionManager::stop() { - exec_->stop(); - resumeThread_->stop(); - resumeThread_->wait(); -} - -std::string TransactionManager::makeLockKey(GraphSpaceID spaceId, const std::string& edge) { - std::string lockKey; - lockKey.append(reinterpret_cast(&spaceId), sizeof(GraphSpaceID)).append(edge); - return lockKey; -} - -std::string TransactionManager::getEdgeKey(const std::string& lockKey) { - std::string edgeKey(lockKey.c_str() + sizeof(GraphSpaceID)); - return edgeKey; -} - -void TransactionManager::addPrime(GraphSpaceID spaceId, const std::string& edge, ResumeType type) { - VLOG(1) << "addPrime() space=" << spaceId << ", hex=" << folly::hexlify(edge) +void TransactionManager::addPrime(GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + const std::string& egKey, + ResumeType type) { + VLOG(1) << "addPrime() space=" << spaceId << ", hex=" << folly::hexlify(egKey) << ", ResumeType=" << static_cast(type); - auto key = makeLockKey(spaceId, edge); - dangleEdges_.insert(std::make_pair(key, type)); -} - -void TransactionManager::delPrime(GraphSpaceID spaceId, const std::string& edge) { - VLOG(1) << "delPrime() space=" << spaceId << ", hex=" << folly::hexlify(edge) << ", readable " - << ConsistUtil::readableKey(8, edge); - auto key = makeLockKey(spaceId, edge); - dangleEdges_.erase(key); - - auto partId = NebulaKeyUtils::getPart(edge); - auto* lk = getLockCore(spaceId, partId, false); - lk->unlock(edge); -} - -void TransactionManager::scanAll() { - LOG(INFO) << "scanAll()"; - std::unordered_map> leaders; - if (env_->kvstore_->allLeader(leaders) == 0) { - LOG(INFO) << "no leader found, skip any resume process"; + auto* proc = ChainProcessorFactory::make(env_, spaceId, termId, egKey, type); + if (proc == nullptr) { + VLOG(1) << "delPrime() space=" << spaceId << ", hex=" << folly::hexlify(egKey); + auto lk = getLockCore(spaceId, partId, termId, false); + lk->unlock(egKey); + // delPrime(spaceId, partId, termId, egKey); return; } - for (auto& leader : leaders) { - auto spaceId = leader.first; - for (auto& partInfo : leader.second) { - auto partId = partInfo.get_part_id(); - scanPrimes(spaceId, partId); + auto fut = proc->getFinished(); + std::move(fut).thenValue([=](auto&& code) { + if (code == Code::SUCCEEDED) { + VLOG(1) << "delPrime() space=" << spaceId << ", hex=" << folly::hexlify(egKey); + auto lk = getLockCore(spaceId, partId, termId, false); + lk->unlock(egKey); + // env_->txnMan_->delPrime(spaceId, partId, termId, egKey); } - } - LOG(INFO) << "finish scanAll()"; + }); + addChainTask(proc); } void TransactionManager::onNewPartAdded(std::shared_ptr& part) { @@ -175,75 +184,72 @@ void TransactionManager::onLeaderLostWrapper(const ::nebula::kvstore::Part::Call opt.spaceId, opt.partId, opt.term); - scannedParts_.erase(std::make_pair(opt.spaceId, opt.partId)); - dangleEdges_.clear(); + auto currTermKey = std::make_pair(opt.spaceId, opt.partId); + auto currTermIter = currTerm_.find(currTermKey); + if (currTermIter == currTerm_.end()) { + return; + } + auto memLockKey = std::make_tuple(opt.spaceId, opt.partId, currTermIter->second); + memLocks_.erase(memLockKey); } void TransactionManager::onLeaderElectedWrapper( const ::nebula::kvstore::Part::CallbackOptions& opt) { LOG(INFO) << folly::sformat( "leader get do scanPrimes space={}, part={}, term={}", opt.spaceId, opt.partId, opt.term); - scanPrimes(opt.spaceId, opt.partId); + scanPrimes(opt.spaceId, opt.partId, opt.term); } -void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { - LOG(INFO) << folly::sformat("{}(), spaceId={}, partId={}", __func__, spaceId, partId); +void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId, TermID termId) { + LOG(INFO) << folly::sformat("{}, space={}, part={}, term={}", __func__, spaceId, partId, termId); std::unique_ptr iter; auto prefix = ConsistUtil::primePrefix(partId); auto rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter); if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { for (; iter->valid(); iter->next()) { - auto edgeKey = ConsistUtil::edgeKeyFromPrime(iter->key()); - VLOG(1) << "scanned edgekey: " << folly::hexlify(edgeKey) - << ", readable: " << ConsistUtil::readableKey(8, edgeKey.str()); - auto lockKey = makeLockKey(spaceId, edgeKey.str()); - auto insSucceed = dangleEdges_.insert(std::make_pair(lockKey, ResumeType::RESUME_CHAIN)); - if (!insSucceed.second) { - LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey); - } - auto* lk = getLockCore(spaceId, partId, false); - auto succeed = lk->try_lock(edgeKey.str()); + auto edgeKey = ConsistUtil::edgeKeyFromPrime(iter->key()).str(); + VLOG(1) << "scanned prime edge: " << folly::hexlify(edgeKey); + auto lk = getLockCore(spaceId, partId, termId, false); + auto succeed = lk->try_lock(edgeKey); if (!succeed) { - LOG(ERROR) << "not supposed to lock fail: " << folly::hexlify(edgeKey); + LOG(ERROR) << "not supposed to lock fail: " + << ", spaceId " << spaceId << ", partId " << partId << ", termId " << termId + << folly::hexlify(edgeKey); } + addPrime(spaceId, partId, termId, edgeKey, ResumeType::RESUME_CHAIN); } } else { VLOG(1) << "primePrefix() " << apache::thrift::util::enumNameSafe(rc); - if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - return; - } } prefix = ConsistUtil::doublePrimePrefix(partId); rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter); if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { for (; iter->valid(); iter->next()) { - auto edgeKey = ConsistUtil::edgeKeyFromDoublePrime(iter->key()); - auto lockKey = makeLockKey(spaceId, edgeKey.str()); - auto insSucceed = dangleEdges_.insert(std::make_pair(lockKey, ResumeType::RESUME_REMOTE)); - if (!insSucceed.second) { - LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey); - } - auto* lk = getLockCore(spaceId, partId, false); - auto succeed = lk->try_lock(edgeKey.str()); + auto edgeKey = ConsistUtil::edgeKeyFromDoublePrime(iter->key()).str(); + VLOG(1) << "scanned double prime edge: " << folly::hexlify(edgeKey); + auto lk = getLockCore(spaceId, partId, termId, false); + auto succeed = lk->try_lock(edgeKey); if (!succeed) { - LOG(ERROR) << "not supposed to lock fail: " << folly::hexlify(edgeKey); + LOG(ERROR) << "not supposed to lock fail: " + << ", space " << spaceId << ", partId " << partId << ", termId " << termId + << folly::hexlify(edgeKey); } + addPrime(spaceId, partId, termId, edgeKey, ResumeType::RESUME_REMOTE); } } else { VLOG(1) << "doublePrimePrefix() " << apache::thrift::util::enumNameSafe(rc); - if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - return; - } } - auto partOfSpace = std::make_pair(spaceId, partId); - auto insRet = scannedParts_.insert(std::make_pair(partOfSpace, 0)); - LOG(INFO) << "insert space=" << spaceId << ", part=" << partId - << ", into white list suc=" << std::boolalpha << insRet.second; + + auto currTermKey = std::make_pair(spaceId, partId); + currTerm_.insert_or_assign(currTermKey, termId); + + LOG(INFO) << "set curr term spaceId = " << spaceId << ", partId = " << partId + << ", termId = " << termId; } -folly::ConcurrentHashMap* TransactionManager::getDangleEdges() { - return &dangleEdges_; +folly::EventBase* TransactionManager::getEventBase() { + return exec_->getEventBase(); } } // namespace storage diff --git a/src/storage/transaction/TransactionManager.h b/src/storage/transaction/TransactionManager.h index acfc2517506..28c0777449b 100644 --- a/src/storage/transaction/TransactionManager.h +++ b/src/storage/transaction/TransactionManager.h @@ -29,97 +29,122 @@ class TransactionManager { using LockGuard = MemoryLockGuard; using LockCore = MemoryLockCore; using UPtrLock = std::unique_ptr; + using SPtrLock = std::shared_ptr; public: explicit TransactionManager(storage::StorageEnv* env); ~TransactionManager() { stop(); - } - - void addChainTask(ChainBaseProcessor* proc) { - folly::async([=] { - proc->prepareLocal() - .via(exec_.get()) - .thenValue([=](auto&& code) { return proc->processRemote(code); }) - .thenValue([=](auto&& code) { return proc->processLocal(code); }) - .ensure([=]() { proc->finish(); }); - }); - } - - folly::Executor* getExecutor() { - return exec_.get(); + join(); } bool start(); void stop(); - LockCore* getLockCore(GraphSpaceID spaceId, PartitionID partId, bool checkWhiteList = true); + /** + * @brief wait until stop + */ + void join(); - InternalStorageClient* getInternalClient() { - return iClient_; - } + /** + * @brief add a new processor to do "chain" work, + * using the internal executor of transaction manager. + * + * @param proc + */ + void addChainTask(ChainBaseProcessor* proc); + + /** + * @brief Get the Lock Core object to set a memory lock for a key. + * + * @param spaceId + * @param partId + * @param termId + * @param checkWhiteList caller outside TransactionManager have to set this true. + * @return nullptr if failed. + */ + SPtrLock getLockCore(GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + bool checkWhiteList = true); // get term of part from kvstore, may fail if this part is not exist - std::pair getTerm(GraphSpaceID spaceId, PartitionID partId); + std::pair getTermFromKVStore(GraphSpaceID spaceId, + PartitionID partId); // check get term from local term cache // this is used by Chain...RemoteProcessor, // to avoid an old leader request overrider a newer leader's bool checkTermFromCache(GraphSpaceID spaceId, PartitionID partId, TermID termId); - void reportFailed(); - // leave a record for (double)prime edge, to let resume processor there is one dangling edge - void addPrime(GraphSpaceID spaceId, const std::string& edgeKey, ResumeType type); + void addPrime(GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + const std::string& edgeKey, + ResumeType type); + + // delete a prime record when recover succeeded. + void delPrime(GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + const std::string& edgeKey); - void delPrime(GraphSpaceID spaceId, const std::string& edgeKey); - - bool checkUnfinishedEdge(GraphSpaceID spaceId, const folly::StringPiece& key); + /** + * @brief need to do a scan to let all prime(double prime) set a memory lock, + * before a partition start to serve. + * otherwise, if a new request comes, it will overwrite the existing lock. + * @param spaceId + * @param partId + */ + void scanPrimes(GraphSpaceID spaceId, PartitionID partId, TermID termId); - folly::ConcurrentHashMap* getDangleEdges(); + /** + * @brief Get the an Event Base object from its internal executor + * + * @return folly::EventBase* + */ + folly::EventBase* getEventBase(); - void scanPrimes(GraphSpaceID spaceId, PartitionID partId); + /** + * @brief stat thread, used for debug + */ + void monitorPoolStat(folly::ThreadPoolExecutor* pool, const std::string& msg); + void bgPrintPoolStat(); + std::string dumpPoolStat(folly::ThreadPoolExecutor* pool, const std::string& msg); - void scanAll(); + std::thread statThread_; + bool stop_{false}; + std::vector> monPoolStats_; protected: - void resumeThread(); - - std::string makeLockKey(GraphSpaceID spaceId, const std::string& edge); - - std::string getEdgeKey(const std::string& lockKey); - // this is a callback register to NebulaStore on new part added. void onNewPartAdded(std::shared_ptr& part); // this is a callback register to Part::onElected void onLeaderElectedWrapper(const ::nebula::kvstore::Part::CallbackOptions& options); + // this is a callback register to Part::onLostLeadership void onLeaderLostWrapper(const ::nebula::kvstore::Part::CallbackOptions& options); protected: - using PartUUID = std::pair; + using SpacePart = std::pair; StorageEnv* env_{nullptr}; std::shared_ptr exec_; - InternalStorageClient* iClient_; - folly::ConcurrentHashMap memLocks_; - folly::ConcurrentHashMap cachedTerms_; - std::unique_ptr resumeThread_; - /** - * edges need to recover will put into this, - * resume processor will get edge from this then do resume. - * */ - folly::ConcurrentHashMap dangleEdges_; + folly::ConcurrentHashMap cachedTerms_; + + using MemLockKey = std::tuple; + folly::ConcurrentHashMap memLocks_; /** * @brief every raft part need to do a scan, * only scanned part allowed to insert edges */ - folly::ConcurrentHashMap, int> scannedParts_; + folly::ConcurrentHashMap, TermID> currTerm_; }; } // namespace storage