Skip to content

Commit

Permalink
simple case m to n
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed May 10, 2023
1 parent 5419140 commit 5fda95a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 4 deletions.
61 changes: 61 additions & 0 deletions src/graph/executor/query/ExpandAllExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetDstBySrcResponse;
using nebula::storage::cpp2::GetNeighborsResponse;

namespace nebula {
Expand Down Expand Up @@ -60,9 +61,69 @@ folly::Future<Status> ExpandAllExecutor::execute() {
if (nextStepVids_.empty()) {
return finish(ResultBuilder().value(Value(std::move(result_))).build());
}
if (vertexColumns_ == nullptr && edgeColumns_ == nullptr) {
return GetDstBySrc();
}
return getNeighbors();
}

folly::Future<Status> ExpandAllExecutor::GetDstBySrc() {
currentStep_++;
time::Duration getDstTime;
StorageClient* storageClient = qctx_->getStorageClient();
StorageClient::CommonRequestParam param(expand_->space(),
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());
std::vector<Value> vids(nextStepVids_.size());
std::move(nextStepVids_.begin(), nextStepVids_.end(), vids.begin());
return storageClient->getDstBySrc(param, std::move(vids), expand_->edgeTypes())
.via(runner())
.ensure([this, getDstTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec()));
})
.thenValue([this](StorageRpcResponse<GetDstBySrcResponse>&& resps) {
memory::MemoryCheckGuard guard;
nextStepVids_.clear();
SCOPED_TIMER(&execTime_);
auto& hostLatency = resps.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resps.responses()[i];
if (result.dsts_ref().has_value()) {
size = (*result.dsts_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i),
folly::toPrettyJson(info));
}
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
return folly::makeFuture<Status>(result.status());
}
auto& responses = resps.responses();
if (currentStep_ <= maxSteps_) {
for (auto& resp : responses) {
auto* dataset = resp.get_dsts();
if (dataset == nullptr) continue;
for (auto& row : dataset->rows) {
nextStepVids_.insert(row.values.begin(), row.values.end());
}
// add the dataset of each step to result_
result_.append(std::move(*dataset));
}
if (nextStepVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
return GetDstBySrc();
}
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
});
}

folly::Future<Status> ExpandAllExecutor::getNeighbors() {
currentStep_++;
StorageClient* storageClient = qctx_->getStorageClient();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/query/ExpandAllExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class ExpandAllExecutor final : public StorageAccessExecutor {

folly::Future<Status> getNeighbors();

folly::Future<Status> GetDstBySrc();

void getNeighborsFromCache(std::unordered_map<Value, std::unordered_set<Value>>& dst2VidsMap,
std::unordered_set<Value>& visitedVids,
std::vector<int64_t>& samples);
Expand Down
26 changes: 23 additions & 3 deletions src/graph/planner/ngql/GoPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,38 @@ PlanNode* GoPlanner::buildJoinDstPlan(PlanNode* dep) {

SubPlan GoPlanner::doSimplePlan() {
auto qctx = goCtx_->qctx;
size_t step = goCtx_->steps.mSteps();
size_t minStep = goCtx_->steps.mSteps();
size_t maxStep = goCtx_->steps.nSteps();

auto* expand = Expand::make(qctx,
startNode_,
goCtx_->space.id,
false, // random
step,
minStep == 0 ? minStep : minStep - 1,
buildEdgeProps(true));
expand->setEdgeTypes(buildEdgeTypes());
expand->setColNames({"_expand_vid"});
expand->setInputVar(goCtx_->vidsVar);

auto* dedup = Dedup::make(qctx, expand);
auto dep = expand;
if (minStep != maxStep) {
// simple m to n case
// go m to n steps from 'xxx' over edge yield distinct edge._dst
dep = ExpandAll::make(qctx,
dep,
goCtx_->space.id,
false, // random
minStep,
maxStep,
buildEdgeProps(true),
nullptr,
nullptr,
nullptr);
dep->setEdgeTypes(buildEdgeTypes());
dep->setColNames({"_expandall_vid"});
}

auto* dedup = Dedup::make(qctx, dep);

auto pool = qctx->objPool();
auto* newYieldExpr = pool->makeAndAdd<YieldColumns>();
Expand Down
2 changes: 1 addition & 1 deletion src/graph/validator/GoValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ bool GoValidator::checkDstPropOrVertexExist(const Expression* expr) {
}

bool GoValidator::isSimpleCase() {
if (!goCtx_->limits.empty() || !goCtx_->distinct || goCtx_->filter || goCtx_->steps.isMToN() ||
if (!goCtx_->limits.empty() || !goCtx_->distinct || goCtx_->filter ||
goCtx_->from.fromType != FromType::kInstantExpr) {
return false;
}
Expand Down
1 change: 1 addition & 0 deletions tests/tck/features/path/AllPath.feature
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
@jmq
Feature: All Path

Background:
Expand Down

0 comments on commit 5fda95a

Please sign in to comment.