diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index f866dead2..29c45659c 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -153,24 +153,6 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { visited.reserve(newEdges.size()); for (auto& newEdge : newEdges) { auto edgeKey = *newEdge.key_ref(); - auto l = std::make_tuple(spaceId_, - partId, - (*edgeKey.src_ref()).getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr()); - if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { - if (!env_->edgesML_->try_lock(l)) { - LOG(ERROR) << folly::format("edge locked : src {}, type {}, rank {}, dst {}", - (*edgeKey.src_ref()).getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr()); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } - dummyLock.emplace_back(std::move(l)); - } VLOG(3) << "PartitionID: " << partId << ", VertexID: " << *edgeKey.src_ref() << ", EdgeType: " << *edgeKey.edge_type_ref() << ", EdgeRanking: " << *edgeKey.ranking_ref() << ", VertexID: " @@ -292,15 +274,32 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { break; } batchHolder->put(std::move(key), std::move(retEnc.value())); + dummyLock.emplace_back(std::make_tuple(spaceId_, + partId, + (*edgeKey.src_ref()).getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr())); } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->edgesML_->unlockBatch(dummyLock); handleAsync(spaceId_, partId, code); continue; } auto batch = encodeBatchValue(std::move(batchHolder)->getBatch()); DCHECK(!batch.empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); + nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), true); + if (!lg) { + auto conflict = lg.conflictKey(); + LOG(ERROR) << "edge conflict " + << std::get<0>(conflict) << ":" + << std::get<1>(conflict) << ":" + << std::get<2>(conflict) << ":" + << std::get<3>(conflict) << ":" + << std::get<4>(conflict) << ":" + << std::get<5>(conflict); + handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); + continue; + } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch), [l = std::move(lg), icw = std::move(wrapper), partId, this] (nebula::cpp2::ErrorCode retCode) { diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index e434e9c73..b822a4fc1 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -167,16 +167,6 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re for (auto& newTag : newTags) { auto tagId = newTag.get_tag_id(); - auto l = std::make_tuple(spaceId_, partId, tagId, vid); - if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { - if (!env_->verticesML_->try_lock(l)) { - LOG(ERROR) << folly::format("The vertex locked : tag {}, vid {}", - tagId, vid); - code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } - dummyLock.emplace_back(std::move(l)); - } VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid << ", TagID: " << tagId; @@ -287,6 +277,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re * step 3 , Insert new vertex data */ batchHolder->put(std::move(key), std::move(retEnc.value())); + dummyLock.emplace_back(std::make_tuple(spaceId_, partId, tagId, vid)); if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) { vertexCache_->evict(std::make_pair(vid, tagId)); @@ -299,16 +290,22 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re } } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->verticesML_->unlockBatch(dummyLock); handleAsync(spaceId_, partId, code); continue; } auto batch = encodeBatchValue(std::move(batchHolder)->getBatch()); DCHECK(!batch.empty()); - nebula::MemoryLockGuard lg(env_->verticesML_.get(), - std::move(dummyLock), - false, - false); + nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), true); + if (!lg) { + auto conflict = lg.conflictKey(); + LOG(ERROR) << "vertex conflict " + << std::get<0>(conflict) << ":" + << std::get<1>(conflict) << ":" + << std::get<2>(conflict) << ":" + << std::get<3>(conflict); + handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); + continue; + } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch), [l = std::move(lg), icw = std::move(wrapper), partId, this] ( nebula::cpp2::ErrorCode retCode) { diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index f6387f1d8..89d387bc5 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -86,42 +86,33 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { std::vector dummyLock; dummyLock.reserve(part.second.size()); - nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED; for (const auto& edgeKey : part.second) { - auto l = std::make_tuple(spaceId_, - partId, - (*edgeKey.src_ref()).getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr()); - if (!env_->edgesML_->try_lock(l)) { - LOG(ERROR) << folly::format("The edge locked : src {}, " - "type {}, tank {}, dst {}", - (*edgeKey.src_ref()).getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr()); - err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - break; - } - dummyLock.emplace_back(std::move(l)); - } - if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->edgesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, err); - continue; + dummyLock.emplace_back(std::make_tuple(spaceId_, + partId, + (*edgeKey.src_ref()).getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr())); } auto batch = deleteEdges(partId, std::move(part.second)); if (!nebula::ok(batch)) { - env_->edgesML_->unlockBatch(dummyLock); handleAsync(spaceId_, partId, nebula::error(batch)); continue; } DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), - std::move(dummyLock), - false, - false); + nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), true); + if (!lg) { + auto conflict = lg.conflictKey(); + LOG(ERROR) << "edge conflict " + << std::get<0>(conflict) << ":" + << std::get<1>(conflict) << ":" + << std::get<2>(conflict) << ":" + << std::get<3>(conflict) << ":" + << std::get<4>(conflict) << ":" + << std::get<5>(conflict); + handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); + continue; + } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)), [l = std::move(lg), icw = std::move(wrapper), partId, this] ( nebula::cpp2::ErrorCode code) { diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 769af089e..9990987f3 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -96,15 +96,21 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { std::vector dummyLock; auto batch = deleteVertices(partId, std::move(pv).second, dummyLock); if (!nebula::ok(batch)) { - env_->verticesML_->unlockBatch(dummyLock); handleAsync(spaceId_, partId, nebula::error(batch)); continue; } DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->verticesML_.get(), - std::move(dummyLock), - false, - false); + nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), true); + if (!lg) { + auto conflict = lg.conflictKey(); + LOG(ERROR) << "vertex conflict " + << std::get<0>(conflict) << ":" + << std::get<1>(conflict) << ":" + << std::get<2>(conflict) << ":" + << std::get<3>(conflict); + handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); + continue; + } env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)), [l = std::move(lg), icw = std::move(wrapper), partId, this] ( nebula::cpp2::ErrorCode code) { @@ -136,13 +142,6 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId, while (iter->valid()) { auto key = iter->key(); auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key); - auto l = std::make_tuple(spaceId_, partId, tagId, vertex.getStr()); - if (!env_->verticesML_->try_lock(l)) { - LOG(ERROR) << folly::format("The vertex locked : tag {}, vid {}", - tagId, vertex.getStr()); - return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; - } - target.emplace_back(std::move(l)); RowReaderWrapper reader; for (auto& index : indexes_) { if (index->get_schema_id().get_tag_id() == tagId) { @@ -187,6 +186,7 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId, VLOG(3) << "Evict vertex cache for vertex ID " << vertex << ", tagId " << tagId; vertexCache_->evict(std::make_pair(vertex.getStr(), tagId)); } + target.emplace_back(std::make_tuple(spaceId_, partId, tagId, vertex.getStr())); batchHolder->remove(key.str()); iter->next(); } diff --git a/src/storage/test/MemoryLockTest.cpp b/src/storage/test/MemoryLockTest.cpp index 54add23e6..cb3251356 100644 --- a/src/storage/test/MemoryLockTest.cpp +++ b/src/storage/test/MemoryLockTest.cpp @@ -68,60 +68,6 @@ TEST_F(MemoryLockTest, MoveTest) { } } -TEST_F(MemoryLockTest, PrepTest) { - MemoryLockCore mlock; - { - EXPECT_TRUE(mlock.try_lock("1")); - EXPECT_TRUE(mlock.try_lock("2")); - EXPECT_FALSE(mlock.try_lock("1")); - EXPECT_FALSE(mlock.try_lock("2")); - std::vector keys{"1", "2"}; - auto* lk = new LockGuard(&mlock, keys, false, false); - EXPECT_TRUE(lk); - delete lk; - } - EXPECT_EQ(0, mlock.size()); -} - -TEST_F(MemoryLockTest, DedupTest) { - MemoryLockCore mlock; - { - std::vector keys{"1", "2", "1", "2"}; - auto* lk = new LockGuard(&mlock, keys, true, false); - EXPECT_TRUE(lk); - EXPECT_EQ(0, mlock.size()); - delete lk; - } - EXPECT_EQ(0, mlock.size()); - { - EXPECT_TRUE(mlock.try_lock("1")); - EXPECT_TRUE(mlock.try_lock("2")); - EXPECT_FALSE(mlock.try_lock("1")); - EXPECT_FALSE(mlock.try_lock("2")); - std::vector keys{"1", "2", "1", "2"}; - auto* lk = new LockGuard(&mlock, keys, true, false); - EXPECT_TRUE(lk); - EXPECT_EQ(2, mlock.size()); - delete lk; - } - EXPECT_EQ(0, mlock.size()); - { - std::vector keys{"1", "2", "1", "2"}; - auto* lk = new LockGuard(&mlock, keys, true, true); - EXPECT_TRUE(lk); - EXPECT_EQ(2, mlock.size()); - delete lk; - } - EXPECT_EQ(0, mlock.size()); - { - std::vector keys{"1", "2", "1", "2"}; - LockGuard lk(&mlock, keys, false, true); - EXPECT_FALSE(lk); - EXPECT_EQ(0, mlock.size()); - } - EXPECT_EQ(0, mlock.size()); -} - } // namespace storage } // namespace nebula diff --git a/src/utils/MemoryLockWrapper.h b/src/utils/MemoryLockWrapper.h index 5a3b85258..d18292c16 100644 --- a/src/utils/MemoryLockWrapper.h +++ b/src/utils/MemoryLockWrapper.h @@ -19,19 +19,14 @@ class MemoryLockGuard { MemoryLockGuard(MemoryLockCore* lock, const Key& key) : MemoryLockGuard(lock, std::vector{key}) {} - MemoryLockGuard(MemoryLockCore* lock, - const std::vector& keys, - bool dedup = false, - bool prepCheck = true) + MemoryLockGuard(MemoryLockCore* lock, const std::vector& keys, bool dedup = false) : lock_(lock), keys_(keys) { if (dedup) { std::sort(keys_.begin(), keys_.end()); - keys_.erase(unique(keys_.begin(), keys_.end()), keys_.end()); - } - if (prepCheck) { - std::tie(iter_, locked_) = lock_->lockBatch(keys_); + auto last = std::unique(keys_.begin(), keys_.end()); + std::tie(iter_, locked_) = lock_->lockBatch(keys_.begin(), last); } else { - locked_ = true; + std::tie(iter_, locked_) = lock_->lockBatch(keys_); } }