diff --git a/src/storage/CommonUtils.h b/src/storage/CommonUtils.h index 93245101d..daee493eb 100644 --- a/src/storage/CommonUtils.h +++ b/src/storage/CommonUtils.h @@ -109,6 +109,23 @@ class IndexCountWrapper { count_->fetch_add(1, std::memory_order_release); } + IndexCountWrapper(const IndexCountWrapper&) = delete; + + IndexCountWrapper(IndexCountWrapper&& icw) noexcept + : count_(icw.count_) { + count_->fetch_add(1, std::memory_order_release); + } + + IndexCountWrapper& operator=(const IndexCountWrapper&) = delete; + + IndexCountWrapper& operator=(IndexCountWrapper&& icw) noexcept { + if (this != &icw) { + count_ = icw.count_; + count_->fetch_add(1, std::memory_order_release); + } + return *this; + } + ~IndexCountWrapper() { count_->fetch_sub(1, std::memory_order_release); } diff --git a/src/storage/exec/UpdateNode.h b/src/storage/exec/UpdateNode.h index 519d87743..3d90eed42 100644 --- a/src/storage/exec/UpdateNode.h +++ b/src/storage/exec/UpdateNode.h @@ -461,9 +461,9 @@ class UpdateEdgeNode : public UpdateNode { kvstore::ResultCode execute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override { CHECK_NOTNULL(planContext_->env_->kvstore_); auto ret = kvstore::ResultCode::SUCCEEDED; - // folly::Function(void)> + IndexCountWrapper wrapper(planContext_->env_); + auto op = [&partId, &edgeKey, this]() -> folly::Optional { - IndexCountWrapper wrapper(planContext_->env_); this->exeResult_ = RelNode::execute(partId, edgeKey); if (this->exeResult_ == kvstore::ResultCode::SUCCEEDED) { if (edgeKey.edge_type != this->edgeType_) { diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index c2ddf76b2..89e6bcc7b 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -128,6 +128,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { std::vector dummyLock; dummyLock.reserve(newEdges.size()); cpp2::ErrorCode code = cpp2::ErrorCode::SUCCEEDED; + for (auto& newEdge : newEdges) { auto edgeKey = newEdge.key; VLOG(3) << "PartitionID: " << partId << ", VertexID: " << edgeKey.src @@ -267,8 +268,9 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { continue; } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch), - [l = std::move(lg), partId, this](kvstore::ResultCode kvRet) { + [l = std::move(lg), icw = std::move(wrapper), partId, this](kvstore::ResultCode kvRet) { UNUSED(l); + UNUSED(icw); handleAsync(spaceId_, partId, kvRet); }); } diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index a0937a6b9..94a05747a 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -135,6 +135,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re std::vector dummyLock; dummyLock.reserve(vertices.size()); cpp2::ErrorCode code = cpp2::ErrorCode::SUCCEEDED; + for (auto& vertex : vertices) { auto vid = vertex.get_id().getStr(); const auto& newTags = vertex.get_tags(); @@ -278,8 +279,9 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re continue; } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch), - [l = std::move(lg), partId, this](kvstore::ResultCode kvRet) { + [l = std::move(lg), icw = std::move(wrapper), partId, this](kvstore::ResultCode kvRet) { UNUSED(l); + UNUSED(icw); handleAsync(spaceId_, partId, kvRet); }); } diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index 4b9b4124c..1a9751b33 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -78,9 +78,11 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { } } else { for (auto& part : partEdges) { + IndexCountWrapper wrapper(env_); auto partId = part.first; std::vector dummyLock; dummyLock.reserve(part.second.size()); + for (const auto& edgeKey : part.second) { dummyLock.emplace_back(std::make_tuple(spaceId_, partId, @@ -109,8 +111,10 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { continue; } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)), - [l = std::move(lg), partId, this](kvstore::ResultCode code) { + [l = std::move(lg), icw = std::move(wrapper), partId, this] ( + kvstore::ResultCode code) { UNUSED(l); + UNUSED(icw); handleAsync(spaceId_, partId, code); }); } @@ -120,7 +124,6 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { ErrorOr DeleteEdgesProcessor::deleteEdges(PartitionID partId, const std::vector& edges) { - IndexCountWrapper wrapper(env_); std::unique_ptr batchHolder = std::make_unique(); for (auto& edge : edges) { auto type = edge.edge_type; diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 086a9f0df..e150d3beb 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -92,6 +92,7 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { } } else { for (auto& pv : partVertices) { + IndexCountWrapper wrapper(env_); auto partId = pv.first; std::vector dummyLock; auto batch = deleteVertices(partId, std::move(pv).second, dummyLock); @@ -112,8 +113,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { continue; } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)), - [l = std::move(lg), partId, this](kvstore::ResultCode code) { + [l = std::move(lg), icw = std::move(wrapper), partId, this] ( + kvstore::ResultCode code) { UNUSED(l); + UNUSED(icw); handleAsync(spaceId_, partId, code); }); } @@ -125,7 +128,6 @@ ErrorOr DeleteVerticesProcessor::deleteVertices(PartitionID partId, const std::vector& vertices, std::vector& target) { - IndexCountWrapper wrapper(env_); target.reserve(vertices.size()); std::unique_ptr batchHolder = std::make_unique(); for (auto& vertex : vertices) {