Skip to content

Commit

Permalink
Cherry pick 3.6 (0701-0720) (#5649)
Browse files Browse the repository at this point in the history
* fix traverse build path memory tracker (#5619)

* Optimize the write performance when host is down (#5571)

* Optimize the write performance when host is down

* fix the comments

---------

Co-authored-by: Sophie <[email protected]>

* Fix too many logs print when listing sessions (#5618)

* Set min_level_for_custom_filter as 0 by default to delete expired d… (#5622)

Reset min_level_for_custom_filter as 0 by default to delete expired data during compaction

* Revert "Revert "Change ccache to sccache"" (#5623)

* Revert "Revert "Change ccache to sccache" (#5613)"

This reverts commit 08a5d90.

* Update pull_request.yml

* Revert  "Change ccache to sccache" (#5627)

Revert "Revert "Revert "Change ccache to sccache"" (#5623)"

This reverts commit c1b433d.

* fix all path memory tracker (#5621)

* fix all path memory tracker

* fix error

* Update pull_request.yml

enable sccache debug log

* Update pull_request.yml

add ninja -v

* Update pull_request.yml

* Update pull_request.yml

* Update pull_request.yml

* Update pull_request.yml

---------

Co-authored-by: George <[email protected]>
Co-authored-by: Sophie <[email protected]>

* Fix edge all predicate embedding when generating path (#5631)

* Fix edge all predicate embedding when generate path

* fmt

* Enhancement/eliminate invalid filter (#5634)

* Fix crash double free of expr.

* Change issue id.

* Elimintate invalid property filter.

* support find circular (#5636)

Co-authored-by: Sophie <[email protected]>

* fix allpath memory tracker (#5640)

fix allpath memory traker

* fix_delete_validate (#5645)

add test case

---------

Co-authored-by: jimingquan <[email protected]>
Co-authored-by: Ryan <[email protected]>
Co-authored-by: Songqing Zhang <[email protected]>
Co-authored-by: George <[email protected]>
Co-authored-by: kyle.cao <[email protected]>
Co-authored-by: shylock <[email protected]>
  • Loading branch information
7 people authored Jul 20, 2023
1 parent c113ffa commit de9b3ed
Show file tree
Hide file tree
Showing 41 changed files with 1,025 additions and 55 deletions.
6 changes: 4 additions & 2 deletions src/common/expression/PredicateExpression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ const Value& PredicateExpression::evalExists(ExpressionContext& ctx) {
DCHECK(collection_->kind() == Expression::Kind::kAttribute ||
collection_->kind() == Expression::Kind::kSubscript ||
collection_->kind() == Expression::Kind::kLabelTagProperty ||
collection_->kind() == Expression::Kind::kTagProperty)
collection_->kind() == Expression::Kind::kTagProperty ||
collection_->kind() == Expression::Kind::kConstant)
<< "actual kind: " << collection_->kind() << ", toString: " << toString();

if (collection_->kind() == Expression::Kind::kLabelTagProperty ||
collection_->kind() == Expression::Kind::kTagProperty) {
collection_->kind() == Expression::Kind::kTagProperty ||
collection_->kind() == Expression::Kind::kConstant) {
auto v = collection_->eval(ctx);
result_ = (!v.isNull()) && (!v.empty());
return result_;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nebula_add_library(
query/AppendVerticesExecutor.cpp
query/RollUpApplyExecutor.cpp
query/PatternApplyExecutor.cpp
query/ValueExecutor.cpp
algo/BFSShortestPathExecutor.cpp
algo/MultiShortestPathExecutor.cpp
algo/AllPathsExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
#include "graph/executor/query/UnionAllVersionVarExecutor.h"
#include "graph/executor/query/UnionExecutor.h"
#include "graph/executor/query/UnwindExecutor.h"
#include "graph/executor/query/ValueExecutor.h"
#include "graph/planner/plan/Admin.h"
#include "graph/planner/plan/Logic.h"
#include "graph/planner/plan/Maintain.h"
Expand Down Expand Up @@ -257,6 +258,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kDedup: {
return pool->makeAndAdd<DedupExecutor>(node, qctx);
}
case PlanNode::Kind::kValue: {
return pool->makeAndAdd<ValueExecutor>(node, qctx);
}
case PlanNode::Kind::kAssign: {
return pool->makeAndAdd<AssignExecutor>(node, qctx);
}
Expand Down
103 changes: 81 additions & 22 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
return future.via(runner())
.ensure([this, buildPathTime]() { addState("build_path_time", buildPathTime); })
.thenValue([this](auto&& resp) {
UNUSED(resp);
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
}
if (!withProp_ || emptyPropVids_.empty()) {
finish(ResultBuilder().value(Value(std::move(result_))).build());
return folly::makeFuture<Status>(Status::OK());
Expand All @@ -222,18 +224,37 @@ folly::Future<Status> AllPathsExecutor::buildResult() {

folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
std::vector<folly::Future<std::vector<NPath*>>> futures;
futures.emplace_back(
folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); }));
futures.emplace_back(
folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); }));
auto leftFuture = folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, false); })
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
auto rightFuture = folly::via(runner(), [this]() { return doBuildPath(1, 0, 0, nullptr, true); })
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

time::Duration conjunctPathTime;
return folly::collect(futures)
return folly::collectAll(futures)
.via(runner())
.thenValue([this](std::vector<std::vector<NPath*>>&& paths) {
.thenValue([this](std::vector<folly::Try<std::vector<NPath*>>>&& paths) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
}
memory::MemoryCheckGuard guard;
auto& leftPaths = paths.front();
auto& rightPaths = paths.back();
auto& leftPathsValues = paths.front();
auto& rightPathsValues = paths.back();
auto leftPaths = std::move(leftPathsValues).value();
auto rightPaths = std::move(rightPathsValues).value();

if (leftSteps_ == 0) {
buildOneWayPath(rightPaths, false);
Expand All @@ -259,6 +280,7 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr,
bool reverse) {
memory::MemoryCheckGuard guard;
auto maxStep = reverse ? rightSteps_ : leftSteps_;
if (step > maxStep) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
Expand All @@ -272,6 +294,9 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
if (step == 1) {
auto& initVids = reverse ? rightInitVids_ : leftInitVids_;
for (auto& vid : initVids) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
}
auto vidIter = adjList.find(vid);
if (vidIter == adjList.end()) {
continue;
Expand All @@ -288,6 +313,9 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
}
} else {
for (auto i = start; i < end; ++i) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
}
auto path = (*pathsPtr)[i];
auto& edgeValue = path->edge;
DCHECK(edgeValue.isEdge());
Expand All @@ -314,28 +342,46 @@ folly::Future<std::vector<AllPathsExecutor::NPath*>> AllPathsExecutor::doBuildPa
}

auto newPathsSize = newPathsPtr->size();
if (newPathsSize == 0) {
if (newPathsSize == 0 || memoryExceeded_.load(std::memory_order_acquire) == true) {
return folly::makeFuture<std::vector<NPath*>>(std::vector<NPath*>());
}
std::vector<folly::Future<std::vector<NPath*>>> futures;
if (newPathsSize < FLAGS_path_batch_size) {
futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr, reverse]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse);
}));
auto future = folly::via(runner(),
[this, step, newPathsSize, newPathsPtr, reverse]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr, reverse);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
futures.emplace_back(std::move(future));
} else {
for (size_t _start = 0; _start < newPathsSize; _start += FLAGS_path_batch_size) {
auto tmp = _start + FLAGS_path_batch_size;
auto _end = tmp > newPathsSize ? newPathsSize : tmp;
futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newPathsPtr, reverse]() {
return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse);
}));
auto future = folly::via(runner(),
[this, step, _start, _end, newPathsPtr, reverse]() {
return doBuildPath(step + 1, _start, _end, newPathsPtr, reverse);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return std::vector<NPath*>();
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception&) { return std::vector<NPath*>(); });
futures.emplace_back(std::move(future));
}
}
return folly::collect(futures).via(runner()).thenValue(
[currentStepResult = newPathsPtr](std::vector<std::vector<NPath*>>&& paths) {
memory::MemoryCheckGuard guard;
return folly::collectAll(futures).via(runner()).thenValue(
[currentStepResult = newPathsPtr](std::vector<folly::Try<std::vector<NPath*>>>&& paths) {
std::vector<NPath*> result = std::move(*currentStepResult);
for (auto& path : paths) {
for (auto& pathValue : paths) {
auto path = std::move(pathValue).value();
if (path.empty()) {
continue;
}
Expand Down Expand Up @@ -485,8 +531,9 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
runner(), [this, start, end, reverse]() { return probe(start, end, reverse); }));
}
}
return folly::collect(futures).via(runner()).thenValue(
[this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
return folly::collect(futures)
.via(runner())
.thenValue([this, path = std::move(oneWayPath)](std::vector<std::vector<Row>>&& resps) {
memory::MemoryCheckGuard guard;
result_.rows = std::move(path);
for (auto& rows : resps) {
Expand All @@ -510,6 +557,14 @@ folly::Future<Status> AllPathsExecutor::conjunctPath(std::vector<NPath*>& leftPa
result_.rows.resize(limit_);
}
return Status::OK();
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand All @@ -528,6 +583,7 @@ void AllPathsExecutor::buildHashTable(std::vector<NPath*>& paths, bool reverse)
}

std::vector<Row> AllPathsExecutor::probe(size_t start, size_t end, bool reverse) {
memory::MemoryCheckGuard guard;
auto buildPath = [](std::vector<Value>& leftPath,
const Value& intersectVertex,
std::vector<Value>& rightPath) {
Expand All @@ -545,6 +601,9 @@ std::vector<Row> AllPathsExecutor::probe(size_t start, size_t end, bool reverse)
std::vector<Row> result;
Row emptyPropVerticesRow;
for (size_t i = start; i < end; ++i) {
if (memoryExceeded_.load(std::memory_order_acquire) == true) {
break;
}
auto& probePath = probePaths_[i];
auto& edgeVal = probePath->edge;
const auto& intersectVid = edgeVal.getEdge().dst;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class AllPathsExecutor final : public StorageAccessExecutor {
bool noLoop_{false};
size_t limit_{std::numeric_limits<size_t>::max()};
std::atomic<size_t> cnt_{0};
std::atomic<bool> memoryExceeded_{false};
size_t maxStep_{0};

size_t leftSteps_{0};
Expand Down
7 changes: 5 additions & 2 deletions src/graph/executor/algo/ShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ size_t ShortestPathExecutor::checkInput(HashSet& startVids, HashSet& endVids) {
auto iter = ectx_->getResult(pathNode_->inputVar()).iter();
const auto& metaVidType = *(qctx()->rctx()->session()->space().spaceDesc.vid_type_ref());
auto vidType = SchemaUtil::propTypeToValueType(metaVidType.get_type());
bool isZeroStep = pathNode_->stepRange().min() == 0;
for (; iter->valid(); iter->next()) {
auto start = iter->getColumn(0);
auto end = iter->getColumn(1);
Expand All @@ -52,8 +53,10 @@ size_t ShortestPathExecutor::checkInput(HashSet& startVids, HashSet& endVids) {
<< ", end type: " << end.type() << ", space vid type: " << vidType;
continue;
}
if (start == end) {
// continue or return error

// When the minimum number of steps is 0, and the starting node and the destination node
// are the same. the shortest path between the two nodes is 0
if (isZeroStep && start == end) {
continue;
}
startVids.emplace(std::move(start));
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ folly::Future<Status> TraverseExecutor::buildPathMultiJobs(size_t minStep, size_
std::vector<Row> TraverseExecutor::buildPath(const Value& initVertex,
size_t minStep,
size_t maxStep) {
memory::MemoryCheckGuard guard;
auto vidIter = adjList_.find(initVertex);
if (vidIter == adjList_.end()) {
return std::vector<Row>();
Expand Down
21 changes: 21 additions & 0 deletions src/graph/executor/query/ValueExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2023 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/query/ValueExecutor.h"

#include "graph/context/Result.h"
#include "graph/planner/plan/Query.h"
#include "graph/service/GraphFlags.h"

namespace nebula {
namespace graph {

folly::Future<Status> ValueExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto value = asNode<ValueNode>(node())->value();
return finish(ResultBuilder().value(std::move(value)).build());
}

} // namespace graph
} // namespace nebula
22 changes: 22 additions & 0 deletions src/graph/executor/query/ValueExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2023 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#pragma once

#include "graph/executor/Executor.h"

// delete the corresponding iterator when the row in the dataset does not meet the conditions
// and save the filtered iterator to the result
namespace nebula {
namespace graph {

class ValueExecutor final : public Executor {
public:
ValueExecutor(const PlanNode *node, QueryContext *qctx) : Executor("ValueExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};

} // namespace graph
} // namespace nebula
1 change: 1 addition & 0 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ nebula_add_library(
rule/RemoveAppendVerticesBelowJoinRule.cpp
rule/EmbedEdgeAllPredIntoTraverseRule.cpp
rule/PushFilterThroughAppendVerticesRule.cpp
rule/EliminateFilterRule.cpp
)

nebula_add_subdirectory(test)
14 changes: 14 additions & 0 deletions src/graph/optimizer/OptRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ RuleSet &RuleSet::DefaultRules() {
return kDefaultRules;
}

RuleSet &RuleSet::QueryRules0() {
static RuleSet kQueryRules0("QueryRuleSet0");
return kQueryRules0;
}

RuleSet &RuleSet::QueryRules() {
static RuleSet kQueryRules("QueryRuleSet");
return kQueryRules;
Expand All @@ -231,6 +236,15 @@ RuleSet *RuleSet::addRule(const OptRule *rule) {
return this;
}

std::string RuleSet::toString() const {
std::stringstream ss;
ss << "RuleSet: " << name_ << std::endl;
for (auto rule : rules_) {
ss << rule->toString() << std::endl;
}
return ss.str();
}

void RuleSet::merge(const RuleSet &ruleset) {
for (auto rule : ruleset.rules()) {
addRule(rule);
Expand Down
3 changes: 3 additions & 0 deletions src/graph/optimizer/OptRule.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class OptRule {
class RuleSet final {
public:
static RuleSet &DefaultRules();
static RuleSet &QueryRules0();
static RuleSet &QueryRules();

RuleSet *addRule(const OptRule *rule);
Expand All @@ -138,6 +139,8 @@ class RuleSet final {
return rules_;
}

std::string toString() const;

private:
explicit RuleSet(const std::string &name);

Expand Down
Loading

0 comments on commit de9b3ed

Please sign in to comment.