From e19f3177ffdae993d959441274d5aa6a4d6b2dba Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Fri, 27 Mar 2020 22:50:45 +0800 Subject: [PATCH] Update the interface atomicOp to support empty string (#2002) Co-authored-by: heng Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> --- src/kvstore/raftex/RaftPart.cpp | 21 +++++++++------- src/kvstore/raftex/RaftPart.h | 2 +- src/kvstore/raftex/test/LogCASTest.cpp | 25 +++++++++++++++++++ src/kvstore/raftex/test/TestShard.cpp | 4 +-- src/kvstore/raftex/test/TestShard.h | 2 +- src/storage/mutate/AddEdgesProcessor.cpp | 2 +- src/storage/mutate/AddVerticesProcessor.cpp | 2 +- src/storage/mutate/DeleteEdgesProcessor.cpp | 11 ++++---- src/storage/mutate/DeleteEdgesProcessor.h | 6 ++--- .../mutate/DeleteVerticesProcessor.cpp | 14 +++++++---- src/storage/mutate/DeleteVerticesProcessor.h | 7 +++--- src/storage/mutate/UpdateEdgeProcessor.cpp | 4 +-- src/storage/mutate/UpdateVertexProcessor.cpp | 4 +-- 13 files changed, 69 insertions(+), 35 deletions(-) diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 39dcb8eae..d327701a4 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -42,12 +42,14 @@ using nebula::thrift::ThriftClientManager; using nebula::wal::FileBasedWal; using nebula::wal::FileBasedWalPolicy; +using OpProcessor = folly::Function(AtomicOp op)>; + class AppendLogsIterator final : public LogIterator { public: AppendLogsIterator(LogID firstLogId, TermID termId, RaftPart::LogCache logs, - folly::Function opCB) + OpProcessor opCB) : firstLogId_(firstLogId) , termId_(termId) , logId_(firstLogId) @@ -92,7 +94,7 @@ class AppendLogsIterator final : public LogIterator { // Process AtomicOp log CHECK(!!opCB_); opResult_ = opCB_(std::move(std::get<3>(tup))); - if (opResult_.size() > 0) { + if (opResult_.hasValue()) { // AtomicOp Succeeded return true; } else { @@ -145,7 +147,8 @@ class AppendLogsIterator final : public LogIterator { folly::StringPiece logMsg() const override { DCHECK(valid()); if (currLogType_ == LogType::ATOMIC_OP) { - return opResult_; + CHECK(opResult_.hasValue()); + return opResult_.value(); } else { return std::get<2>(logs_.at(idx_)); } @@ -180,12 +183,12 @@ class AppendLogsIterator final : public LogIterator { bool valid_{true}; LogType lastLogType_{LogType::NORMAL}; LogType currLogType_{LogType::NORMAL}; - std::string opResult_; + folly::Optional opResult_; LogID firstLogId_; TermID termId_; LogID logId_; RaftPart::LogCache logs_; - folly::Function opCB_; + OpProcessor opCB_; }; @@ -637,10 +640,10 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, firstId, termId, std::move(swappedOutLogs), - [this] (AtomicOp opCB) -> std::string { + [this] (AtomicOp opCB) -> folly::Optional { CHECK(opCB != nullptr); auto opRet = opCB(); - if (opRet.empty()) { + if (!opRet.hasValue()) { // Failed sendingPromise_.setOneSingleValue(AppendLogResult::E_ATOMIC_OP_FAILURE); } @@ -921,9 +924,9 @@ void RaftPart::processAppendLogResponses( firstLogId, currTerm, std::move(logs_), - [this] (AtomicOp op) -> std::string { + [this] (AtomicOp op) -> folly::Optional { auto opRet = op(); - if (opRet.empty()) { + if (!opRet.hasValue()) { // Failed sendingPromise_.setOneSingleValue( AppendLogResult::E_ATOMIC_OP_FAILURE); diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 724b4e1ae..9ff19d7eb 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -68,7 +68,7 @@ class AppendLogsIterator; * otherwise it will return the new operation's encoded string whick should be applied atomically. * You could implement CAS, READ-MODIFY-WRITE operations though it. * */ -using AtomicOp = folly::Function; +using AtomicOp = folly::Function(void)>; class RaftPart : public std::enable_shared_from_this { friend class AppendLogsIterator; diff --git a/src/kvstore/raftex/test/LogCASTest.cpp b/src/kvstore/raftex/test/LogCASTest.cpp index ce7024cb2..2b9e255dc 100644 --- a/src/kvstore/raftex/test/LogCASTest.cpp +++ b/src/kvstore/raftex/test/LogCASTest.cpp @@ -217,6 +217,31 @@ TEST_F(LogCASTest, ZipCasTest) { checkConsensus(copies_, 0, 4, msgs); } +TEST_F(LogCASTest, EmptyTest) { + { + LOG(INFO) << "return empty string for atomic operation!"; + folly::Baton<> baton; + leader_->atomicOpAsync([log = std::move(log)] () mutable { + return ""; + }).then([&baton] (AppendLogResult res) { + ASSERT_EQ(AppendLogResult::SUCCEEDED, res); + baton.post(); + }); + baton.wait(); + } + { + LOG(INFO) << "return none string for atomic operation!"; + folly::Baton<> baton; + leader_->atomicOpAsync([log = std::move(log)] () mutable { + return folly::none; + }).then([&baton] (AppendLogResult res) { + ASSERT_EQ(AppendLogResult::E_ATOMIC_OP_FAILURE, res); + baton.post(); + }); + baton.wait(); + } +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index b561f887e..d5ab184d3 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -29,12 +29,12 @@ HostAddr decodeLearner(const folly::StringPiece& log) { return learner; } -std::string compareAndSet(const std::string& log) { +folly::Optional compareAndSet(const std::string& log) { switch (log[0]) { case 'T': return log.substr(1); default: - return std::string(); + return folly::none; } } diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index b1556071a..5d9d03bc7 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -28,7 +28,7 @@ std::string encodeLearner(const HostAddr& addr); HostAddr decodeLearner(const folly::StringPiece& log); -std::string compareAndSet(const std::string& log); +folly::Optional compareAndSet(const std::string& log); std::string encodeTransferLeader(const HostAddr& addr); diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index c799593be..633fa86f3 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -44,7 +44,7 @@ void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { std::for_each(req.parts.begin(), req.parts.end(), [&](auto& partEdges) { auto partId = partEdges.first; auto atomic = [version, partId, edges = std::move(partEdges.second), this]() - -> std::string { + -> folly::Optional { return addEdges(version, partId, edges); }; auto callback = [partId, this](kvstore::ResultCode code) { diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index d3fe7765c..befa809f3 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -56,7 +56,7 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { std::for_each(partVertices.begin(), partVertices.end(), [&](auto &pv) { auto partId = pv.first; auto atomic = [version, partId, vertices = std::move(pv.second), this]() - -> std::string { + -> folly::Optional { return addVertices(version, partId, vertices); }; auto callback = [partId, this](kvstore::ResultCode code) { diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index 951dbf357..9295955b8 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -63,7 +63,7 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &partEdges) { auto partId = partEdges.first; auto atomic = [spaceId, partId, edges = std::move(partEdges.second), this]() - -> std::string { + -> folly::Optional { return deleteEdges(spaceId, partId, edges); }; auto callback = [spaceId, partId, this](kvstore::ResultCode code) { @@ -74,9 +74,10 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { } } -std::string DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId, - PartitionID partId, - const std::vector& edges) { +folly::Optional +DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId, + PartitionID partId, + const std::vector& edges) { std::unique_ptr batchHolder = std::make_unique(); for (auto& edge : edges) { auto type = edge.edge_type; @@ -89,7 +90,7 @@ std::string DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId, if (ret != kvstore::ResultCode::SUCCEEDED) { VLOG(3) << "Error! ret = " << static_cast(ret) << ", spaceId " << spaceId; - return ""; + return folly::none; } bool isLatestVE = true; while (iter->valid()) { diff --git a/src/storage/mutate/DeleteEdgesProcessor.h b/src/storage/mutate/DeleteEdgesProcessor.h index dc68e3613..fcf3fe32c 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.h +++ b/src/storage/mutate/DeleteEdgesProcessor.h @@ -32,9 +32,9 @@ class DeleteEdgesProcessor : public BaseProcessor { , indexMan_(indexMan) {} - std::string deleteEdges(GraphSpaceID spaceId, - PartitionID partId, - const std::vector& edges); + folly::Optional deleteEdges(GraphSpaceID spaceId, + PartitionID partId, + const std::vector& edges); private: meta::IndexManager* indexMan_{nullptr}; diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 5a7dc85a8..5ebb823eb 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -62,7 +62,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { callingNum_ = req.parts.size(); std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &pv) { auto partId = pv.first; - auto atomic = [spaceId, partId, v = std::move(pv.second), this]() -> std::string { + auto atomic = [spaceId, + partId, + v = std::move(pv.second), + this]() -> folly::Optional { return deleteVertices(spaceId, partId, v); }; @@ -75,9 +78,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { } } -std::string DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId, - PartitionID partId, - const std::vector& vertices) { +folly::Optional +DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId, + PartitionID partId, + const std::vector& vertices) { std::unique_ptr batchHolder = std::make_unique(); for (auto& vertex : vertices) { auto prefix = NebulaKeyUtils::vertexPrefix(partId, vertex); @@ -86,7 +90,7 @@ std::string DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId, if (ret != kvstore::ResultCode::SUCCEEDED) { VLOG(3) << "Error! ret = " << static_cast(ret) << ", spaceId " << spaceId; - return ""; + return folly::none; } TagID latestVVId = -1; while (iter->valid()) { diff --git a/src/storage/mutate/DeleteVerticesProcessor.h b/src/storage/mutate/DeleteVerticesProcessor.h index d2982805d..1ffcd146e 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.h +++ b/src/storage/mutate/DeleteVerticesProcessor.h @@ -36,9 +36,10 @@ class DeleteVerticesProcessor : public BaseProcessor { , indexMan_(indexMan) , vertexCache_(cache) {} - std::string deleteVertices(GraphSpaceID spaceId, - PartitionID partId, - const std::vector& vertices); + folly::Optional + deleteVertices(GraphSpaceID spaceId, + PartitionID partId, + const std::vector& vertices); private: meta::IndexManager* indexMan_{nullptr}; diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index e1d6d56d8..9f3997ae4 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -464,7 +464,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { << ", dst: " << edgeKey.get_dst() << ", ranking: " << edgeKey.get_ranking(); CHECK_NOTNULL(kvstore_); this->kvstore_->asyncAtomicOp(this->spaceId_, partId, - [partId, edgeKey, this] () -> std::string { + [partId, edgeKey, this] () -> folly::Optional { // TODO(shylock) the AtomicOP can't return various error // so put it in the processor filterResult_ = checkFilter(partId, edgeKey); @@ -477,7 +477,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { case FilterResult::E_ERROR: // fallthrough default: { - return ""; + return folly::none; } } }, diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index dadff823e..d60b05617 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -413,7 +413,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { << ", partId: " << partId << ", vId: " << vId; CHECK_NOTNULL(kvstore_); this->kvstore_->asyncAtomicOp(this->spaceId_, partId, - [partId, vId, this] () -> std::string { + [partId, vId, this] () -> folly::Optional { // TODO(shylock) the AtomicOP can't return various error // so put it in the processor filterResult_ = checkFilter(partId, vId); @@ -426,7 +426,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { case FilterResult::E_ERROR: // Fallthrough default: { - return ""; + return folly::none; } } },