Skip to content

Commit

Permalink
fix extractfiltervisitor
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Sep 13, 2022
1 parent cb95ef1 commit d4f2524
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
26 changes: 10 additions & 16 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@ namespace graph {
folly::Future<Status> SubgraphExecutor::execute() {
SCOPED_TIMER(&execTime_);
totalSteps_ = subgraph_->steps();
startVids_ = buildRequestDataSet();
if (startVids_.rows.empty()) {
auto iter = ectx_->getResult(subgraph_->inputVar()).iter();
auto res = buildRequestListByVidType(iter.get(), subgraph_->src(), true);
NG_RETURN_IF_ERROR(res);
vids_ = res.value();
if (vids_.empty()) {
DataSet emptyResult;
return finish(ResultBuilder().value(Value(std::move(emptyResult))).build());
}
return getNeighbors();
}

DataSet SubgraphExecutor::buildRequestDataSet() {
auto inputVar = subgraph_->inputVar();
auto iter = ectx_->getResult(inputVar).iter();
return buildRequestDataSetByVidType(iter.get(), subgraph_->src(), true);
}

folly::Future<Status> SubgraphExecutor::getNeighbors() {
time::Duration getNbrTime;
StorageClient* storageClient = qctx_->getStorageClient();
Expand All @@ -39,7 +36,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
return storageClient
->getNeighbors(param,
{nebula::kVid},
std::move(startVids_.rows),
std::move(vids_),
{},
edgeDirection,
nullptr,
Expand All @@ -54,6 +51,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
.via(runner())
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
// addStats(resp, getNbrTime.elapsedInUSec());
vids_.clear();
return handleResponse(std::move(resp));
});
}
Expand All @@ -80,7 +78,6 @@ folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
if (!subgraph_->oneMoreStep()) {
--steps;
}
startVids_.rows.clear();

if (!process(std::move(iter)) || ++currentStep_ > steps) {
return folly::makeFuture<Status>(Status::OK());
Expand All @@ -90,7 +87,6 @@ folly::Future<Status> SubgraphExecutor::handleResponse(RpcResponse&& resps) {
}

bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
auto& rows = startVids_.rows;
auto gnSize = iter->size();
if (gnSize == 0) {
return false;
Expand All @@ -99,7 +95,7 @@ bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
ResultBuilder builder;
builder.value(iter->valuePtr());

robin_hood::unordered_flat_map<Value, int64_t, std::hash<Value>> currentVids;
HashMap currentVids;
currentVids.reserve(gnSize);
historyVids_.reserve(historyVids_.size() + gnSize);
if (currentStep_ == 1) {
Expand Down Expand Up @@ -140,9 +136,7 @@ bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
}
if (currentVids.emplace(dst, currentStep_).second) {
// next vids for getNeighbor
Row row;
row.values.emplace_back(std::move(dst));
rows.emplace_back(std::move(row));
vids_.emplace_back(std::move(dst));
}
iter->next();
}
Expand All @@ -153,7 +147,7 @@ bool SubgraphExecutor::process(std::unique_ptr<GetNeighborsIter> iter) {
// update historyVids
historyVids_.insert(std::make_move_iterator(currentVids.begin()),
std::make_move_iterator(currentVids.end()));
if (rows.empty()) {
if (vids_.empty()) {
return false;
}
return true;
Expand Down
12 changes: 4 additions & 8 deletions src/graph/executor/algo/SubgraphExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,27 @@ using RpcResponse = storage::StorageRpcResponse<storage::cpp2::GetNeighborsRespo

class SubgraphExecutor : public StorageAccessExecutor {
public:
using HashMap = robin_hood::unordered_flat_map<Value, size_t, std::hash<Value>>;

SubgraphExecutor(const PlanNode* node, QueryContext* qctx)
: StorageAccessExecutor("SubgraphExecutor", node, qctx) {
subgraph_ = asNode<Subgraph>(node);
}

folly::Future<Status> execute() override;

DataSet buildRequestDataSet();

folly::Future<Status> getNeighbors();

bool process(std::unique_ptr<GetNeighborsIter> iter);

folly::Future<Status> handleResponse(RpcResponse&& resps);

private:
robin_hood::unordered_flat_map<Value, int64_t, std::hash<Value>> historyVids_;
HashMap historyVids_;
const Subgraph* subgraph_{nullptr};
size_t currentStep_{1};
size_t totalSteps_{1};
DataSet startVids_;
<<<<<<< HEAD
=======
std::unordered_map<Value, size_t> historyVids_;
>>>>>>> fix error
std::vector<Value> vids_;
};

} // namespace graph
Expand Down
23 changes: 18 additions & 5 deletions src/graph/validator/GetSubgraphValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ Status GetSubgraphValidator::validateBothInOutBound(BothInOutClause* out) {
return Status::OK();
}

static Expression* rewriteDstProp2SrcProp(const Expression* expr) {
ObjectPool* pool = expr->getObjPool();
auto matcher = [](const Expression* e) -> bool {
return e->kind() == Expression::Kind::kDstProperty;
};
auto rewriter = [pool](const Expression* e) -> Expression* {
auto dstExpr = static_cast<const DestPropertyExpression*>(e);
return SourcePropertyExpression::make(pool, dstExpr->sym(), dstExpr->prop());
};

return RewriteVisitor::transform(expr, std::move(matcher), std::move(rewriter));
}

// Check validity of filter expression, rewrites expression to fit its sementic,
// disable some invalid expression types, collect properties used in filter.
Status GetSubgraphValidator::validateWhere(WhereClause* where) {
Expand Down Expand Up @@ -132,18 +145,18 @@ Status GetSubgraphValidator::validateWhere(WhereClause* where) {

NG_RETURN_IF_ERROR(deduceProps(filter, subgraphCtx_->exprProps));

auto condition = filter->clone();
if (ExpressionUtils::findAny(expr, {Expression::Kind::kDstProperty})) {
auto visitor = ExtractFilterExprVisitor::makePushGetNeighbors(qctx_->objPool());
auto visitor = ExtractFilterExprVisitor::makePushGetVertices(qctx_->objPool());
filter->accept(&visitor);
if (!visitor.ok()) {
return Status::SemanticError("filter error");
}
auto remainedExpr = std::move(visitor).remainedExpr();
subgraphCtx_->filter = filter;
subgraphCtx_->edgeFilter = remainedExpr;
subgraphCtx_->edgeFilter = std::move(visitor).remainedExpr();
} else {
subgraphCtx_->filter = subgraphCtx_->edgeFilter = filter;
subgraphCtx_->edgeFilter = condition;
}
subgraphCtx_->filter = rewriteDstProp2SrcProp(condition);
return Status::OK();
}

Expand Down
10 changes: 9 additions & 1 deletion src/graph/visitor/ExtractFilterExprVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ void ExtractFilterExprVisitor::visit(VariablePropertyExpression *) {
}

void ExtractFilterExprVisitor::visit(DestPropertyExpression *) {
canBePushed_ = false;
switch (pushType_) {
case PushType::kGetNeighbors:
case PushType::kGetEdges:
canBePushed_ = false;
break;
case PushType::kGetVertices:
canBePushed_ = true;
break;
}
}

void ExtractFilterExprVisitor::visit(SourcePropertyExpression *) {
Expand Down

0 comments on commit d4f2524

Please sign in to comment.