Skip to content

Commit

Permalink
add unwind & check vidType when executing not validate (#4456)
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 authored and Sophie-Xie committed Sep 8, 2022
1 parent 035ab3c commit 57f048c
Show file tree
Hide file tree
Showing 28 changed files with 335 additions and 59 deletions.
36 changes: 23 additions & 13 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ struct Vid<std::string> {
};

template <typename VidType>
DataSet buildRequestDataSet(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup) {
StatusOr<DataSet> buildRequestDataSet(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher) {
DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr;
nebula::DataSet vertices({kVid});
auto s = iter->size();
Expand All @@ -54,11 +55,19 @@ DataSet buildRequestDataSet(const SpaceInfo &space,

for (; iter->valid(); iter->next()) {
auto vid = expr->eval(exprCtx(iter));
if (!SchemaUtil::isValidVid(vid, vidType)) {
LOG(WARNING) << "Mismatched vid type: " << vid.type()
<< ", space vid type: " << SchemaUtil::typeToString(vidType);
if (vid.empty()) {
continue;
}
if (!SchemaUtil::isValidVid(vid, vidType)) {
if (isCypher) {
continue;
}
std::stringstream ss;
ss << "`" << vid.toString() << "', the srcs should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << vid.type()
<< "'";
return Status::Error(ss.str());
}
if (dedup && !uniqueSet.emplace(Vid<VidType>::value(vid)).second) {
continue;
}
Expand All @@ -73,16 +82,17 @@ bool StorageAccessExecutor::isIntVidType(const SpaceInfo &space) const {
return (*space.spaceDesc.vid_type_ref()).type == nebula::cpp2::PropertyType::INT64;
}

DataSet StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup) {
StatusOr<DataSet> StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher) {
const auto &space = qctx()->rctx()->session()->space();
QueryExpressionContext exprCtx(qctx()->ectx());

if (isIntVidType(space)) {
return internal::buildRequestDataSet<int64_t>(space, exprCtx, iter, expr, dedup);
return internal::buildRequestDataSet<int64_t>(space, exprCtx, iter, expr, dedup, isCypher);
}
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup);
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

std::string StorageAccessExecutor::getStorageDetail(
Expand Down
5 changes: 4 additions & 1 deletion src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ class StorageAccessExecutor : public Executor {

bool isIntVidType(const SpaceInfo &space) const;

DataSet buildRequestDataSetByVidType(Iterator *iter, Expression *expr, bool dedup);
StatusOr<DataSet> buildRequestDataSetByVidType(Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher = false);
};

} // namespace graph
Expand Down
10 changes: 8 additions & 2 deletions src/graph/executor/query/AppendVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ folly::Future<Status> AppendVerticesExecutor::execute() {
return appendVertices();
}

DataSet AppendVerticesExecutor::buildRequestDataSet(const AppendVertices *av) {
StatusOr<DataSet> AppendVerticesExecutor::buildRequestDataSet(const AppendVertices *av) {
if (av == nullptr) {
return nebula::DataSet({kVid});
}
auto valueIter = ectx_->getResult(av->inputVar()).iter();
return buildRequestDataSetByVidType(valueIter.get(), av->src(), av->dedup());
return buildRequestDataSetByVidType(valueIter.get(), av->src(), av->dedup(), true);
}

folly::Future<Status> AppendVerticesExecutor::appendVertices() {
Expand All @@ -31,8 +31,14 @@ folly::Future<Status> AppendVerticesExecutor::appendVertices() {
return handleNullProp(av);
}

<<<<<<< HEAD
DataSet vertices = buildRequestDataSet(av);
StorageClient *storageClient = qctx()->getStorageClient();
=======
auto res = buildRequestDataSet(av);
NG_RETURN_IF_ERROR(res);
auto vertices = std::move(res).value();
>>>>>>> fc434bbc4 (add unwind & check vidType when executing not validate (#4456))
if (vertices.rows.empty()) {
return finish(ResultBuilder().value(Value(DataSet(av->colNames()))).build());
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/AppendVerticesExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AppendVerticesExecutor final : public GetPropExecutor {
folly::Future<Status> execute() override;

private:
DataSet buildRequestDataSet(const AppendVertices *gv);
StatusOr<DataSet> buildRequestDataSet(const AppendVertices *gv);

folly::Future<Status> appendVertices();

Expand Down
24 changes: 22 additions & 2 deletions src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "graph/executor/query/GetEdgesExecutor.h"

#include "graph/planner/plan/Query.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
Expand All @@ -17,19 +18,36 @@ folly::Future<Status> GetEdgesExecutor::execute() {
return getEdges();
}

DataSet GetEdgesExecutor::buildRequestDataSet(const GetEdges *ge) {
StatusOr<DataSet> GetEdgesExecutor::buildRequestDataSet(const GetEdges *ge) {
auto valueIter = ectx_->getResult(ge->inputVar()).iter();
QueryExpressionContext exprCtx(qctx()->ectx());

nebula::DataSet edges({kSrc, kType, kRank, kDst});
edges.rows.reserve(valueIter->size());
std::unordered_set<std::tuple<Value, Value, Value, Value>> uniqueEdges;
uniqueEdges.reserve(valueIter->size());

const auto &space = qctx()->rctx()->session()->space();
const auto &vidType = *(space.spaceDesc.vid_type_ref());
for (; valueIter->valid(); valueIter->next()) {
auto type = ge->type()->eval(exprCtx(valueIter.get()));
auto src = ge->src()->eval(exprCtx(valueIter.get()));
auto dst = ge->dst()->eval(exprCtx(valueIter.get()));
auto rank = ge->ranking()->eval(exprCtx(valueIter.get()));
if (!SchemaUtil::isValidVid(src, vidType)) {
std::stringstream ss;
ss << "`" << src.toString() << "', the src should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << src.type()
<< "'";
return Status::Error(ss.str());
}
if (!SchemaUtil::isValidVid(dst, vidType)) {
std::stringstream ss;
ss << "`" << dst.toString() << "', the dst should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << dst.type()
<< "'";
return Status::Error(ss.str());
}
type = type < 0 ? -type : type;
auto edgeKey = std::make_tuple(src, type, rank, dst);
if (ge->dedup() && !uniqueEdges.emplace(std::move(edgeKey)).second) {
Expand All @@ -52,7 +70,9 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
return Status::Error("ptr is nullptr");
}

auto edges = buildRequestDataSet(ge);
auto res = buildRequestDataSet(ge);
NG_RETURN_IF_ERROR(res);
auto edges = std::move(res).value();

if (edges.rows.empty()) {
// TODO: add test for empty input.
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetEdgesExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GetEdgesExecutor final : public GetPropExecutor {
folly::Future<Status> execute() override;

private:
DataSet buildRequestDataSet(const GetEdges *ge);
StatusOr<DataSet> buildRequestDataSet(const GetEdges *ge);

folly::Future<Status> getEdges();
};
Expand Down
6 changes: 4 additions & 2 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ using nebula::storage::cpp2::GetNeighborsResponse;
namespace nebula {
namespace graph {

DataSet GetNeighborsExecutor::buildRequestDataSet() {
StatusOr<DataSet> GetNeighborsExecutor::buildRequestDataSet() {
SCOPED_TIMER(&execTime_);
auto inputVar = gn_->inputVar();
auto iter = ectx_->getResult(inputVar).iter();
return buildRequestDataSetByVidType(iter.get(), gn_->src(), gn_->dedup());
}

folly::Future<Status> GetNeighborsExecutor::execute() {
DataSet reqDs = buildRequestDataSet();
auto res = buildRequestDataSet();
NG_RETURN_IF_ERROR(res);
auto reqDs = std::move(res).value();
if (reqDs.rows.empty()) {
List emptyResult;
return finish(ResultBuilder()
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetNeighborsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class GetNeighborsExecutor final : public StorageAccessExecutor {

folly::Future<Status> execute() override;

DataSet buildRequestDataSet();
StatusOr<DataSet> buildRequestDataSet();

private:
using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsResponse>;
Expand Down
6 changes: 4 additions & 2 deletions src/graph/executor/query/GetVerticesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ folly::Future<Status> GetVerticesExecutor::getVertices() {
auto *gv = asNode<GetVertices>(node());
StorageClient *storageClient = qctx()->getStorageClient();

DataSet vertices = buildRequestDataSet(gv);
auto res = buildRequestDataSet(gv);
NG_RETURN_IF_ERROR(res);
auto vertices = std::move(res).value();
if (vertices.rows.empty()) {
// TODO: add test for empty input.
return finish(
Expand Down Expand Up @@ -55,7 +57,7 @@ folly::Future<Status> GetVerticesExecutor::getVertices() {
});
}

DataSet GetVerticesExecutor::buildRequestDataSet(const GetVertices *gv) {
StatusOr<DataSet> GetVerticesExecutor::buildRequestDataSet(const GetVertices *gv) {
if (gv == nullptr) {
return nebula::DataSet({kVid});
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/GetVerticesExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GetVerticesExecutor final : public GetPropExecutor {
folly::Future<Status> execute() override;

private:
DataSet buildRequestDataSet(const GetVertices *gv);
StatusOr<DataSet> buildRequestDataSet(const GetVertices *gv);

folly::Future<Status> getVertices();
};
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/query/UnwindExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ folly::Future<Status> UnwindExecutor::execute() {
std::vector<Value> vals = extractList(list);
for (auto &v : vals) {
Row row;
if (!emptyInput) {
if (!unwind->fromPipe() && !emptyInput) {
row = *(iter->row());
}
row.values.emplace_back(std::move(v));
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/test/GetNeighborsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ TEST_F(GetNeighborsTest, BuildRequestDataSet) {
gn->setInputVar("input_gn");

auto gnExe = std::make_unique<GetNeighborsExecutor>(gn, qctx_.get());
auto reqDs = gnExe->buildRequestDataSet();
auto res = gnExe->buildRequestDataSet();
auto reqDs = std::move(res).value();

DataSet expected;
expected.colNames = {kVid};
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ std::unique_ptr<PlanNodeDescription> Unwind::explain() const {

PlanNode* Unwind::clone() const {
auto* newUnwind = Unwind::make(qctx_, nullptr);
newUnwind->setFromPipe(fromPipe_);
newUnwind->cloneMembers(*this);
return newUnwind;
}
Expand Down
9 changes: 9 additions & 0 deletions src/graph/planner/plan/Query.h
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,14 @@ class Unwind final : public SingleInputNode {
return alias_;
}

bool fromPipe() const {
return fromPipe_;
}

void setFromPipe(bool fromPipe) {
fromPipe_ = fromPipe;
}

PlanNode* clone() const override;
std::unique_ptr<PlanNodeDescription> explain() const override;

Expand All @@ -860,6 +868,7 @@ class Unwind final : public SingleInputNode {
private:
Expression* unwindExpr_{nullptr};
std::string alias_;
bool fromPipe_{false};
};

// Sort the given record set.
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ namespace graph {
case Sentence::Kind::kGetSubgraph:
case Sentence::Kind::kLimit:
case Sentence::Kind::kGroupBy:
case Sentence::Kind::kUnwind:
case Sentence::Kind::kReturn: {
return PermissionManager::canReadSchemaOrData(session, vctx);
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ nebula_add_library(
FindPathValidator.cpp
LookupValidator.cpp
MatchValidator.cpp
UnwindValidator.cpp
)

nebula_add_subdirectory(test)
24 changes: 13 additions & 11 deletions src/graph/validator/FetchEdgesValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,14 @@ Status FetchEdgesValidator::validateEdgeName() {

// Check validity of edge key(src, type, rank, dst)
// from Input/Variable expression specified in sentence
StatusOr<std::string> FetchEdgesValidator::validateEdgeRef(const Expression *expr,
Value::Type type) {
StatusOr<std::string> FetchEdgesValidator::validateEdgeRef(const Expression *expr) {
const auto &kind = expr->kind();
if (kind != Expression::Kind::kInputProperty && kind != EdgeExpression::Kind::kVarProperty) {
return Status::SemanticError("`%s', only input and variable expression is acceptable",
expr->toString().c_str());
}
auto exprType = deduceExprType(expr);
NG_RETURN_IF_ERROR(exprType);
if (exprType.value() != type) {
std::stringstream ss;
ss << "`" << expr->toString() << "' should be type of " << type << ", but was "
<< exprType.value();
return Status::SemanticError(ss.str());
}
if (kind == Expression::Kind::kInputProperty) {
return inputVarName_;
}
Expand All @@ -72,13 +65,22 @@ Status FetchEdgesValidator::validateEdgeKey() {
std::string inputVarName;
if (sentence->isRef()) { // edge keys from Input/Variable
auto *srcExpr = sentence->ref()->srcid();
auto result = validateEdgeRef(srcExpr, vidType_);
auto result = validateEdgeRef(srcExpr);
NG_RETURN_IF_ERROR(result);
inputVarName = std::move(result).value();

auto *rankExpr = sentence->ref()->rank();
if (rankExpr->kind() != Expression::Kind::kConstant) {
result = validateEdgeRef(rankExpr, Value::Type::INT);
auto rankType = deduceExprType(rankExpr);
NG_RETURN_IF_ERROR(rankType);
if (rankType.value() != Value::Type::INT) {
std::stringstream ss;
ss << "`" << rankExpr->toString() << "' should be type of INT, but was "
<< rankType.value();
return Status::SemanticError(ss.str());
}

result = validateEdgeRef(rankExpr);
NG_RETURN_IF_ERROR(result);
if (inputVarName != result.value()) {
return Status::SemanticError(
Expand All @@ -88,7 +90,7 @@ Status FetchEdgesValidator::validateEdgeKey() {
}

auto *dstExpr = sentence->ref()->dstid();
result = validateEdgeRef(dstExpr, vidType_);
result = validateEdgeRef(dstExpr);
NG_RETURN_IF_ERROR(result);
if (inputVarName != result.value()) {
return Status::SemanticError(
Expand Down
2 changes: 1 addition & 1 deletion src/graph/validator/FetchEdgesValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class FetchEdgesValidator final : public Validator {

Status validateEdgeName();

StatusOr<std::string> validateEdgeRef(const Expression* expr, Value::Type type);
StatusOr<std::string> validateEdgeRef(const Expression* expr);

Status validateEdgeKey();

Expand Down
Loading

0 comments on commit 57f048c

Please sign in to comment.