Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

fix index data loss when insert data during rebuilding index #408

Merged
merged 2 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/storage/CommonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ class IndexCountWrapper {
count_->fetch_add(1, std::memory_order_release);
}

IndexCountWrapper(const IndexCountWrapper&) = delete;

IndexCountWrapper(IndexCountWrapper&& icw) noexcept
: count_(icw.count_) {
count_->fetch_add(1, std::memory_order_release);
}

IndexCountWrapper& operator=(const IndexCountWrapper&) = delete;

IndexCountWrapper& operator=(IndexCountWrapper&& icw) noexcept {
if (this != &icw) {
count_ = icw.count_;
count_->fetch_add(1, std::memory_order_release);
}
return *this;
}

~IndexCountWrapper() {
count_->fetch_sub(1, std::memory_order_release);
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/exec/UpdateNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ class UpdateEdgeNode : public UpdateNode<cpp2::EdgeKey> {
kvstore::ResultCode execute(PartitionID partId, const cpp2::EdgeKey& edgeKey) override {
CHECK_NOTNULL(planContext_->env_->kvstore_);
auto ret = kvstore::ResultCode::SUCCEEDED;
// folly::Function<folly::Optional<std::string>(void)>
IndexCountWrapper wrapper(planContext_->env_);

auto op = [&partId, &edgeKey, this]() -> folly::Optional<std::string> {
IndexCountWrapper wrapper(planContext_->env_);
this->exeResult_ = RelNode::execute(partId, edgeKey);
if (this->exeResult_ == kvstore::ResultCode::SUCCEEDED) {
if (edgeKey.edge_type != this->edgeType_) {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
std::vector<EMLI> dummyLock;
dummyLock.reserve(newEdges.size());
cpp2::ErrorCode code = cpp2::ErrorCode::SUCCEEDED;

for (auto& newEdge : newEdges) {
auto edgeKey = newEdge.key;
VLOG(3) << "PartitionID: " << partId << ", VertexID: " << edgeKey.src
Expand Down Expand Up @@ -267,8 +268,9 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
continue;
}
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch),
[l = std::move(lg), partId, this](kvstore::ResultCode kvRet) {
[l = std::move(lg), icw = std::move(wrapper), partId, this](kvstore::ResultCode kvRet) {
UNUSED(l);
UNUSED(icw);
handleAsync(spaceId_, partId, kvRet);
});
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
std::vector<VMLI> dummyLock;
dummyLock.reserve(vertices.size());
cpp2::ErrorCode code = cpp2::ErrorCode::SUCCEEDED;

for (auto& vertex : vertices) {
auto vid = vertex.get_id().getStr();
const auto& newTags = vertex.get_tags();
Expand Down Expand Up @@ -278,8 +279,9 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
continue;
}
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch),
[l = std::move(lg), partId, this](kvstore::ResultCode kvRet) {
[l = std::move(lg), icw = std::move(wrapper), partId, this](kvstore::ResultCode kvRet) {
UNUSED(l);
UNUSED(icw);
handleAsync(spaceId_, partId, kvRet);
});
}
Expand Down
7 changes: 5 additions & 2 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
}
} else {
for (auto& part : partEdges) {
IndexCountWrapper wrapper(env_);
auto partId = part.first;
std::vector<EMLI> dummyLock;
dummyLock.reserve(part.second.size());

for (const auto& edgeKey : part.second) {
dummyLock.emplace_back(std::make_tuple(spaceId_,
partId,
Expand Down Expand Up @@ -109,8 +111,10 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
continue;
}
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)),
[l = std::move(lg), partId, this](kvstore::ResultCode code) {
[l = std::move(lg), icw = std::move(wrapper), partId, this] (
kvstore::ResultCode code) {
UNUSED(l);
UNUSED(icw);
handleAsync(spaceId_, partId, code);
});
}
Expand All @@ -120,7 +124,6 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {

ErrorOr<kvstore::ResultCode, std::string>
DeleteEdgesProcessor::deleteEdges(PartitionID partId, const std::vector<cpp2::EdgeKey>& edges) {
IndexCountWrapper wrapper(env_);
std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& edge : edges) {
auto type = edge.edge_type;
Expand Down
6 changes: 4 additions & 2 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
}
} else {
for (auto& pv : partVertices) {
IndexCountWrapper wrapper(env_);
auto partId = pv.first;
std::vector<VMLI> dummyLock;
auto batch = deleteVertices(partId, std::move(pv).second, dummyLock);
Expand All @@ -112,8 +113,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
continue;
}
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)),
[l = std::move(lg), partId, this](kvstore::ResultCode code) {
[l = std::move(lg), icw = std::move(wrapper), partId, this] (
kvstore::ResultCode code) {
UNUSED(l);
UNUSED(icw);
handleAsync(spaceId_, partId, code);
});
}
Expand All @@ -125,7 +128,6 @@ ErrorOr<kvstore::ResultCode, std::string>
DeleteVerticesProcessor::deleteVertices(PartitionID partId,
const std::vector<Value>& vertices,
std::vector<VMLI>& target) {
IndexCountWrapper wrapper(env_);
target.reserve(vertices.size());
std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& vertex : vertices) {
Expand Down