Skip to content

Commit

Permalink
Enable MULTIUPDATE and LOOKUP | UPDATE (#5953)
Browse files Browse the repository at this point in the history
* enable multiupdate and lookup & update

* deleted keyword LOOKUP|UPDATE and replace MULTIUPDATE with UPDATE

* add tck tests

* combined update, multiupdate & refupdate into one routine

* first revision after code review

* second round revision

* third revision

* edit error message

---------

Co-authored-by: yuxuan.wang <[email protected]>
  • Loading branch information
JackChuengQAQ and Salieri-004 authored Oct 29, 2024
1 parent 6c09e83 commit b4f83eb
Show file tree
Hide file tree
Showing 16 changed files with 1,046 additions and 195 deletions.
14 changes: 7 additions & 7 deletions src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ folly::Future<Status> DeleteVerticesExecutor::deleteVertices() {
continue;
}
if (!SchemaUtil::isValidVid(val, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong vid type `" << val.type() << "', value `" << val.toString() << "'";
return Status::Error(ss.str());
auto errorMsg = fmt::format(
"Wrong vid type `{}', value `{}'", Value::toString(val.type()), val.toString());
return Status::Error(errorMsg);
}
vertices.emplace_back(std::move(val));
}
Expand Down Expand Up @@ -103,9 +103,9 @@ folly::Future<Status> DeleteTagsExecutor::deleteTags() {
continue;
}
if (!SchemaUtil::isValidVid(val, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong vid type `" << val.type() << "', value `" << val.toString() << "'";
return Status::Error(ss.str());
auto errorMsg = fmt::format(
"Wrong vid type `{}', value `{}'", Value::toString(val.type()), val.toString());
return Status::Error(errorMsg);
}
delTag.id_ref() = val;
delTag.tags_ref() = dtNode->tagIds();
Expand Down Expand Up @@ -147,7 +147,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
DCHECK(!inputVar.empty());
auto& inputResult = ectx_->getResult(inputVar);
auto iter = inputResult.iter();
edgeKeys.reserve(iter->size());
edgeKeys.reserve(iter->size() * 2);
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
storage::cpp2::EdgeKey edgeKey;
Expand Down
282 changes: 211 additions & 71 deletions src/graph/executor/mutate/UpdateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using nebula::storage::StorageClient;
namespace nebula {
namespace graph {

StatusOr<DataSet> UpdateBaseExecutor::handleResult(DataSet &&data) {
Status UpdateBaseExecutor::handleMultiResult(DataSet &result, DataSet &&data) {
if (data.colNames.size() <= 1) {
if (yieldNames_.empty()) {
return Status::OK();
Expand All @@ -26,115 +26,255 @@ StatusOr<DataSet> UpdateBaseExecutor::handleResult(DataSet &&data) {
<< data.colNames.size() - 1;
return Status::Error("Wrong return prop size");
}
DataSet result;
result.colNames = std::move(yieldNames_);
result.colNames = yieldNames_;
for (auto &row : data.rows) {
std::vector<Value> columns;
for (auto i = 1u; i < row.values.size(); i++) {
columns.emplace_back(std::move(row.values[i]));
}
result.rows.emplace_back(std::move(columns));
}
return result;
return Status::OK();
}

folly::Future<Status> UpdateVertexExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto *uvNode = asNode<UpdateVertex>(node());
yieldNames_ = uvNode->getYieldNames();
auto *urvNode = asNode<UpdateVertex>(node());

auto vidRef = urvNode->getVidRef();
std::vector<Value> vertices;
const auto &spaceInfo = qctx()->rctx()->session()->space();

if (vidRef != nullptr) {
auto inputVar = urvNode->inputVar();
DCHECK(!inputVar.empty());
auto &inputResult = ectx_->getResult(inputVar);
auto iter = inputResult.iter();
vertices.reserve(iter->size());
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
auto val = Expression::eval(vidRef, ctx(iter.get()));
if (val.isNull() || val.empty()) {
continue;
}
if (!SchemaUtil::isValidVid(val, *spaceInfo.spaceDesc.vid_type_ref())) {
auto errorMsg = fmt::format(
"Wrong vid type `{}', value `{}'", Value::toString(val.type()), val.toString());
return Status::Error(errorMsg);
}
vertices.emplace_back(std::move(val));
}
}

if (vertices.empty()) {
return Status::OK();
}

std::vector<folly::Future<StatusOr<storage::cpp2::UpdateResponse>>> futures;
futures.reserve(vertices.size());

yieldNames_ = urvNode->getYieldNames();
time::Duration updateVertTime;
auto plan = qctx()->plan();
auto sess = qctx()->rctx()->session();
StorageClient::CommonRequestParam param(
uvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->updateVertex(param,
uvNode->getVId(),
uvNode->getTagId(),
uvNode->getUpdatedProps(),
uvNode->getInsertable(),
uvNode->getReturnProps(),
uvNode->getCondition())
urvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled());

for (auto &vId : vertices) {
futures.emplace_back(qctx()
->getStorageClient()
->updateVertex(param,
vId,
urvNode->getTagId(),
urvNode->getUpdatedProps(),
urvNode->getInsertable(),
urvNode->getReturnProps(),
urvNode->getCondition())
.via(runner()));
}

return folly::collectAll(futures)
.via(runner())
.ensure([updateVertTime]() {
VLOG(1) << "Update vertice time: " << updateVertTime.elapsedInUSec() << "us";
VLOG(1) << "updateVertTime: " << updateVertTime.elapsedInUSec() << "us";
})
.thenValue([this](StatusOr<storage::cpp2::UpdateResponse> resp) {
.thenValue([this](std::vector<folly::Try<StatusOr<storage::cpp2::UpdateResponse>>> results) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(WARNING) << "Update vertices fail: " << resp.status();
return resp.status();
}
auto value = std::move(resp).value();
for (auto &code : value.get_result().get_failed_parts()) {
NG_RETURN_IF_ERROR(handleErrorCode(code.get_code(), code.get_part_id()));
}
if (value.props_ref().has_value()) {
auto status = handleResult(std::move(*value.props_ref()));
if (!status.ok()) {
return status.status();
DataSet finalResult;
for (auto &result : results) {
if (result.hasException()) {
LOG(WARNING) << "Update vertex request threw an exception.";
return Status::Error("Exception occurred during update.");
}
return finish(ResultBuilder()
.value(std::move(status).value())
.iter(Iterator::Kind::kDefault)
.build());

if (!result.value().ok()) {
LOG(WARNING) << "Update vertex failed: " << result.value().status();
return result.value().status();
}

auto value = std::move(result.value()).value();
for (auto &code : value.get_result().get_failed_parts()) {
NG_RETURN_IF_ERROR(handleErrorCode(code.get_code(), code.get_part_id()));
}

if (value.props_ref().has_value()) {
auto status = handleMultiResult(finalResult, std::move(*value.props_ref()));
if (!status.ok()) {
return status;
}
}
}

if (finalResult.colNames.empty()) {
return Status::OK();
} else {
return finish(
ResultBuilder().value(std::move(finalResult)).iter(Iterator::Kind::kDefault).build());
}
return Status::OK();
});
}

folly::Future<Status> UpdateEdgeExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto *ueNode = asNode<UpdateEdge>(node());
storage::cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = ueNode->getSrcId();
edgeKey.ranking_ref() = ueNode->getRank();
edgeKey.edge_type_ref() = ueNode->getEdgeType();
edgeKey.dst_ref() = ueNode->getDstId();
yieldNames_ = ueNode->getYieldNames();

auto *ureNode = asNode<UpdateEdge>(node());
auto *edgeKeyRef = DCHECK_NOTNULL(ureNode->getEdgeKeyRef());
auto edgeType = ureNode->getEdgeType();
time::Duration updateEdgeTime;

yieldNames_ = ureNode->getYieldNames();

const auto &spaceInfo = qctx()->rctx()->session()->space();
auto inputVar = ureNode->inputVar();
DCHECK(!inputVar.empty());
auto &inputResult = ectx_->getResult(inputVar);
auto iter = inputResult.iter();

std::vector<storage::cpp2::EdgeKey> edgeKeys;
std::vector<storage::cpp2::EdgeKey> reverse_edgeKeys;
edgeKeys.reserve(iter->size());
reverse_edgeKeys.reserve(iter->size());

QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
auto srcId = Expression::eval(edgeKeyRef->srcid(), ctx(iter.get()));
if (!SchemaUtil::isValidVid(srcId, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong srcId type `" << srcId.type() << "`, value `" << srcId.toString() << "'";
return Status::Error(ss.str());
}
auto dstId = Expression::eval(edgeKeyRef->dstid(), ctx(iter.get()));
if (!SchemaUtil::isValidVid(dstId, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong dstId type `" << dstId.type() << "', value `" << dstId.toString() << "'";
return Status::Error(ss.str());
}
auto rank = Expression::eval(edgeKeyRef->rank(), ctx(iter.get()));
if (!rank.isInt()) {
std::stringstream ss;
ss << "Wrong rank type `" << rank.type() << "', value `" << rank.toString() << "'";
return Status::Error(ss.str());
}
DCHECK(edgeKeyRef->type());
auto type = Expression::eval(edgeKeyRef->type(), ctx(iter.get()));
if (!type.isInt()) {
std::stringstream ss;
ss << "Wrong edge type `" << type.type() << "', value `" << type.toString() << "'";
return Status::Error(ss.str());
}
storage::cpp2::EdgeKey edgeKey;
edgeKey.src_ref() = srcId;
edgeKey.dst_ref() = dstId;
edgeKey.ranking_ref() = rank.getInt();
edgeKey.edge_type_ref() = edgeType;

storage::cpp2::EdgeKey reverse_edgeKey;
reverse_edgeKey.src_ref() = std::move(dstId);
reverse_edgeKey.dst_ref() = std::move(srcId);
reverse_edgeKey.ranking_ref() = rank.getInt();
reverse_edgeKey.edge_type_ref() = -edgeType;

edgeKeys.emplace_back(std::move(edgeKey));
reverse_edgeKeys.emplace_back(std::move(reverse_edgeKey));
}

if (edgeKeys.empty()) {
return Status::OK();
}

auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
ureNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->updateEdge(param,
edgeKey,
ueNode->getUpdatedProps(),
ueNode->getInsertable(),
ueNode->getReturnProps(),
ueNode->getCondition())

std::vector<folly::Future<StatusOr<storage::cpp2::UpdateResponse>>> futures;
futures.reserve(edgeKeys.size() + reverse_edgeKeys.size());

for (auto &edgeKey : edgeKeys) {
futures.emplace_back(qctx()
->getStorageClient()
->updateEdge(param,
edgeKey,
ureNode->getUpdatedProps(),
ureNode->getInsertable(),
{},
ureNode->getCondition())
.via(runner()));
}

for (auto &edgeKey : reverse_edgeKeys) {
futures.emplace_back(qctx()
->getStorageClient()
->updateEdge(param,
edgeKey,
ureNode->getUpdatedProps(),
ureNode->getInsertable(),
ureNode->getReturnProps(),
ureNode->getCondition())
.via(runner()));
}

return folly::collectAll(futures)
.via(runner())
.ensure([updateEdgeTime]() {
VLOG(1) << "Update edge time: " << updateEdgeTime.elapsedInUSec() << "us";
VLOG(1) << "updateEdgeTime: " << updateEdgeTime.elapsedInUSec() << "us";
})
.thenValue([this](StatusOr<storage::cpp2::UpdateResponse> resp) {
.thenValue([this](std::vector<folly::Try<StatusOr<storage::cpp2::UpdateResponse>>> results) {
memory::MemoryCheckGuard guard;
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(WARNING) << "Update edge failed: " << resp.status();
return resp.status();
}
auto value = std::move(resp).value();
for (auto &code : value.get_result().get_failed_parts()) {
NG_RETURN_IF_ERROR(handleErrorCode(code.get_code(), code.get_part_id()));
}
if (value.props_ref().has_value()) {
auto status = handleResult(std::move(*value.props_ref()));
if (!status.ok()) {
return status.status();
DataSet finalResult;
for (auto &result : results) {
if (result.hasException()) {
LOG(WARNING) << "Update edge request threw an exception.";
return Status::Error("Exception occurred during update.");
}

if (!result.value().ok()) {
LOG(WARNING) << "Update edge failed: " << result.value().status();
return result.value().status();
}

auto value = std::move(result.value()).value();
for (auto &code : value.get_result().get_failed_parts()) {
NG_RETURN_IF_ERROR(handleErrorCode(code.get_code(), code.get_part_id()));
}

if (value.props_ref().has_value() && value.props_ref()->colNames.size() > 1) {
auto status = handleMultiResult(finalResult, std::move(*value.props_ref()));
if (!status.ok()) {
return status;
}
}
return finish(ResultBuilder()
.value(std::move(status).value())
.iter(Iterator::Kind::kDefault)
.build());
}
return Status::OK();

if (finalResult.colNames.empty()) {
return Status::OK();
} else {
return finish(
ResultBuilder().value(std::move(finalResult)).iter(Iterator::Kind::kDefault).build());
}
});
}

} // namespace graph
} // namespace nebula
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/UpdateExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class UpdateBaseExecutor : public StorageAccessExecutor {
virtual ~UpdateBaseExecutor() {}

protected:
StatusOr<DataSet> handleResult(DataSet &&data);
Status handleMultiResult(DataSet &result, DataSet &&data);

protected:
std::vector<std::string> yieldNames_;
Expand Down
4 changes: 1 addition & 3 deletions src/graph/planner/plan/Mutate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ std::unique_ptr<PlanNodeDescription> UpdateVertex::explain() const {

std::unique_ptr<PlanNodeDescription> UpdateEdge::explain() const {
auto desc = Update::explain();
addDescription("srcId", srcId_.toString(), desc.get());
addDescription("dstId", dstId_.toString(), desc.get());
addDescription("rank", folly::to<std::string>(rank_), desc.get());
addDescription("edgeKeyRef", edgeKeyRef_ ? edgeKeyRef_->toString() : "", desc.get());
addDescription("edgeType", folly::to<std::string>(edgeType_), desc.get());
return desc;
}
Expand Down
Loading

0 comments on commit b4f83eb

Please sign in to comment.