Skip to content

Commit

Permalink
simple case m to n
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed May 10, 2023
1 parent 5419140 commit aad2d87
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 12 deletions.
70 changes: 70 additions & 0 deletions src/graph/executor/query/ExpandAllExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

using nebula::storage::StorageClient;
using nebula::storage::StorageRpcResponse;
using nebula::storage::cpp2::GetDstBySrcResponse;
using nebula::storage::cpp2::GetNeighborsResponse;

namespace nebula {
Expand Down Expand Up @@ -60,9 +61,78 @@ folly::Future<Status> ExpandAllExecutor::execute() {
if (nextStepVids_.empty()) {
return finish(ResultBuilder().value(Value(std::move(result_))).build());
}
if (vertexColumns_ == nullptr && edgeColumns_ == nullptr) {
return GetDstBySrc();
}
return getNeighbors();
}

folly::Future<Status> ExpandAllExecutor::GetDstBySrc() {
currentStep_++;
time::Duration getDstTime;
StorageClient* storageClient = qctx_->getStorageClient();
StorageClient::CommonRequestParam param(expand_->space(),
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());
std::vector<Value> vids(nextStepVids_.size());
std::move(nextStepVids_.begin(), nextStepVids_.end(), vids.begin());
return storageClient->getDstBySrc(param, std::move(vids), expand_->edgeTypes())
.via(runner())
.ensure([this, getDstTime]() {
SCOPED_TIMER(&execTime_);
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec()));
})
.thenValue([this](StorageRpcResponse<GetDstBySrcResponse>&& resps) {
memory::MemoryCheckGuard guard;
nextStepVids_.clear();
SCOPED_TIMER(&execTime_);
auto& hostLatency = resps.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resps.responses()[i];
if (result.dsts_ref().has_value()) {
size = (*result.dsts_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size);
otherStats_.emplace(folly::sformat("step{} resp [{}]", currentStep_, i),
folly::toPrettyJson(info));
}
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
return folly::makeFuture<Status>(result.status());
}
auto& responses = resps.responses();
if (currentStep_ <= maxSteps_) {
for (auto& resp : responses) {
auto* dataset = resp.get_dsts();
if (dataset == nullptr) continue;
for (auto& row : dataset->rows) {
nextStepVids_.insert(row.values.begin(), row.values.end());
// add the dataset of each step to result_
result_.rows.emplace_back(row);
}
}
if (nextStepVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
return GetDstBySrc();
} else {
for (auto& resp : responses) {
auto* dataset = resp.get_dsts();
if (dataset == nullptr) continue;
for (auto& row : dataset->rows) {
// add the dataset of each step to result_
result_.rows.emplace_back(row);
}
}
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
}
});
}

folly::Future<Status> ExpandAllExecutor::getNeighbors() {
currentStep_++;
StorageClient* storageClient = qctx_->getStorageClient();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/query/ExpandAllExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class ExpandAllExecutor final : public StorageAccessExecutor {

folly::Future<Status> getNeighbors();

folly::Future<Status> GetDstBySrc();

void getNeighborsFromCache(std::unordered_map<Value, std::unordered_set<Value>>& dst2VidsMap,
std::unordered_set<Value>& visitedVids,
std::vector<int64_t>& samples);
Expand Down
30 changes: 27 additions & 3 deletions src/graph/planner/ngql/GoPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,42 @@ PlanNode* GoPlanner::buildJoinDstPlan(PlanNode* dep) {

SubPlan GoPlanner::doSimplePlan() {
auto qctx = goCtx_->qctx;
size_t step = goCtx_->steps.mSteps();
size_t minStep = goCtx_->steps.mSteps();
size_t maxStep = goCtx_->steps.nSteps();
size_t steps = minStep;
if (minStep != maxStep) {
steps = minStep == 0 ? minStep : minStep - 1;
}

auto* expand = Expand::make(qctx,
startNode_,
goCtx_->space.id,
false, // random
step,
steps,
buildEdgeProps(true));
expand->setEdgeTypes(buildEdgeTypes());
expand->setColNames({"_expand_vid"});
expand->setInputVar(goCtx_->vidsVar);

auto* dedup = Dedup::make(qctx, expand);
auto dep = expand;
if (minStep != maxStep) {
// simple m to n case
// go m to n steps from 'xxx' over edge yield distinct edge._dst
dep = ExpandAll::make(qctx,
dep,
goCtx_->space.id,
false, // random
minStep,
maxStep,
buildEdgeProps(true),
nullptr,
nullptr,
nullptr);
dep->setEdgeTypes(buildEdgeTypes());
dep->setColNames({"_expandall_vid"});
}

auto* dedup = Dedup::make(qctx, dep);

auto pool = qctx->objPool();
auto* newYieldExpr = pool->makeAndAdd<YieldColumns>();
Expand Down
2 changes: 1 addition & 1 deletion src/graph/validator/GoValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ bool GoValidator::checkDstPropOrVertexExist(const Expression* expr) {
}

bool GoValidator::isSimpleCase() {
if (!goCtx_->limits.empty() || !goCtx_->distinct || goCtx_->filter || goCtx_->steps.isMToN() ||
if (!goCtx_->limits.empty() || !goCtx_->distinct || goCtx_->filter ||
goCtx_->from.fromType != FromType::kInstantExpr) {
return false;
}
Expand Down
16 changes: 8 additions & 8 deletions tests/tck/features/go/SimpleCase.feature
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,11 @@ Feature: Simple case
And the execution plan should be:
| id | name | dependencies | operator info |
| 6 | Aggregate | 5 | |
| 5 | Dedup | 4 | |
| 4 | Project | 3 | |
| 5 | Project | 4 | |
| 4 | Dedup | 3 | |
| 3 | ExpandAll | 2 | |
| 2 | Expand | 1 | |
| 1 | Start | | |
| 2 | Expand | 0 | |
| 0 | Start | | |
When profiling query:
"""
GO 1 to 3 STEP FROM "Tony Parker" OVER like WHERE $$.player.age > 40 YIELD DISTINCT id($$), $$.player.age as age, $$.player.name | ORDER BY $-.age
Expand Down Expand Up @@ -374,11 +374,11 @@ Feature: Simple case
| 12 | Dedup | 11 | |
| 11 | Project | 10 | |
| 10 | HashInnerJoin | 5,9 | |
| 5 | Dedup | 4 | |
| 4 | Project | 3 | |
| 5 | Project | 4 | |
| 4 | Dedup | 3 | |
| 3 | ExpandAll | 2 | |
| 2 | Expand | 1 | |
| 1 | Start | | |
| 2 | Expand | 0 | |
| 0 | Start | | |
| 9 | ExpandAll | 8 | |
| 8 | Expand | 7 | |
| 7 | Argument | | |
Expand Down

0 comments on commit aad2d87

Please sign in to comment.