Skip to content

Commit

Permalink
Return variable if not empty (#1233)
Browse files Browse the repository at this point in the history
* Add return sentence.

* Add test.

* Address comment and add more tests.

* Rebase and fix conflict.

* Update.
  • Loading branch information
CPWstatic authored and dutor committed Nov 21, 2019
1 parent 5ace754 commit 317203f
Show file tree
Hide file tree
Showing 51 changed files with 435 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/graph/AlterEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void AlterEdgeExecutor::execute() {
}

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/AlterTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void AlterTagExecutor::execute() {
}

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
4 changes: 2 additions & 2 deletions src/graph/AssignmentExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ Status AssignmentExecutor::prepare() {
DCHECK(onError_);
onError_(std::move(s));
};
auto onFinish = [this] () {
auto onFinish = [this] (Executor::ProcessControl ctr) {
DCHECK(onFinish_);
onFinish_();
onFinish_(ctr);
};
auto onResult = [this] (std::unique_ptr<InterimResult> result) {
ectx()->variableHolder()->add(*var_, std::move(result));
Expand Down
6 changes: 3 additions & 3 deletions src/graph/BalanceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void BalanceExecutor::balanceLeader() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down Expand Up @@ -101,7 +101,7 @@ void BalanceExecutor::balanceData(bool isStop) {
resp_->set_rows(std::move(rows));

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down Expand Up @@ -172,7 +172,7 @@ void BalanceExecutor::showBalancePlan() {
rows.back().set_columns(std::move(row));
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nebula_add_library(
FindPathExecutor.cpp
LimitExecutor.cpp
GroupByExecutor.cpp
ReturnExecutor.cpp
)

nebula_add_library(
Expand Down
6 changes: 3 additions & 3 deletions src/graph/ConfigExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void ConfigExecutor::showVariables() {
resp_->set_rows(std::move(rows));

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down Expand Up @@ -154,7 +154,7 @@ void ConfigExecutor::setVariables() {
resp_ = std::make_unique<cpp2::ExecutionResponse>();

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down Expand Up @@ -209,7 +209,7 @@ void ConfigExecutor::getVariables() {
resp_->set_rows(std::move(rows));

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/CreateEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void CreateEdgeExecutor::execute() {
}

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/CreateSpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void CreateSpaceExecutor::execute() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/CreateTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void CreateTagExecutor::execute() {
}

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DeleteVertexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void DeleteVertexExecutor::deleteVertex() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
return;
};

Expand Down
2 changes: 1 addition & 1 deletion src/graph/DescribeEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void DescribeEdgeExecutor::execute() {

resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DescribeSpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void DescribeSpaceExecutor::execute() {
rows.back().set_columns(std::move(row));
resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DescribeTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void DescribeTagExecutor::execute() {

resp_->set_rows(std::move(rows));
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DownloadExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void DownloadExecutor::execute() {
}
resp_ = std::make_unique<cpp2::ExecutionResponse>();
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DropEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void DropEdgeExecutor::execute() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DropSpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void DropSpaceExecutor::execute() {
ectx()->rctx()->session()->setSpace("", -1);
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/DropTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void DropTagExecutor::execute() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
3 changes: 2 additions & 1 deletion src/graph/ExecutionPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ void ExecutionPlan::execute() {
}

// Prepared
auto onFinish = [this] () {
auto onFinish = [this] (Executor::ProcessControl ctr) {
UNUSED(ctr);
this->onFinish();
};
auto onError = [this] (Status s) {
Expand Down
4 changes: 4 additions & 0 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "graph/FindPathExecutor.h"
#include "graph/LimitExecutor.h"
#include "graph/GroupByExecutor.h"
#include "graph/ReturnExecutor.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -159,6 +160,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kLimit:
executor = std::make_unique<LimitExecutor>(sentence, ectx());
break;
case Sentence::Kind::kReturn:
executor = std::make_unique<ReturnExecutor>(sentence, ectx());
break;
case Sentence::Kind::kUnknown:
LOG(ERROR) << "Sentence kind unknown";
return nullptr;
Expand Down
11 changes: 8 additions & 3 deletions src/graph/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ class Executor : public cpp::NonCopyable, public cpp::NonMovable {

virtual const char* name() const = 0;

enum ProcessControl : uint8_t {
kNext = 0,
kReturn,
};

/**
* Set callback to be invoked when this executor is finished(normally).
*/
void setOnFinish(std::function<void()> onFinish) {
void setOnFinish(std::function<void(ProcessControl)> onFinish) {
onFinish_ = onFinish;
}
/**
Expand Down Expand Up @@ -102,8 +107,8 @@ class Executor : public cpp::NonCopyable, public cpp::NonMovable {
}

protected:
ExecutionContext *ectx_;
std::function<void()> onFinish_;
ExecutionContext *ectx_;
std::function<void(ProcessControl)> onFinish_;
std::function<void(Status)> onError_;
};

Expand Down
4 changes: 2 additions & 2 deletions src/graph/FetchExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void FetchExecutor::onEmptyInputs() {
} else if (resp_ == nullptr) {
resp_ = std::make_unique<cpp2::ExecutionResponse>();
}
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
}

Status FetchExecutor::getOutputSchema(
Expand Down Expand Up @@ -153,7 +153,7 @@ void FetchExecutor::finishExecution(std::unique_ptr<RowSetWriter> rsWriter) {
}
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
}
} // namespace graph
} // namespace nebula
6 changes: 3 additions & 3 deletions src/graph/FindPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void FindPathExecutor::execute() {
void FindPathExecutor::getNeighborsAndFindPath() {
// We meet the dead end.
if (fromVids_.empty() || toVids_.empty()) {
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
return;
}

Expand Down Expand Up @@ -270,7 +270,7 @@ void FindPathExecutor::findPath() {
// if frontiersF meets frontiersT, we found an even path
if (!intersect.empty()) {
if (shortest_ && targetNotFound_.empty()) {
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
return;
}
for (auto intersectId : intersect) {
Expand All @@ -280,7 +280,7 @@ void FindPathExecutor::findPath() {

if (isFinalStep() ||
(shortest_ && targetNotFound_.empty())) {
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
return;
} else {
VLOG(2) << "Current step:" << currentStep_;
Expand Down
4 changes: 2 additions & 2 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) {
}
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
}

StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() {
Expand Down Expand Up @@ -796,7 +796,7 @@ void GoExecutor::onEmptyInputs() {
} else if (resp_ == nullptr) {
resp_ = std::make_unique<cpp2::ExecutionResponse>();
}
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
}


Expand Down
4 changes: 2 additions & 2 deletions src/graph/GroupByExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void GroupByExecutor::execute() {

if (rows_.empty()) {
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
return;
}

Expand Down Expand Up @@ -219,7 +219,7 @@ void GroupByExecutor::execute() {
}

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
}


Expand Down
2 changes: 1 addition & 1 deletion src/graph/IngestExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void IngestExecutor::execute() {
}
resp_ = std::make_unique<cpp2::ExecutionResponse>();
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/InsertEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void InsertEdgeExecutor::execute() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/InsertVertexExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ void InsertVertexExecutor::execute() {
return;
}
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
};

auto error = [this] (auto &&e) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/InterimResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class InterimResult final {
return rsReader_->schema();
}

std::vector<std::string> getColNames() {
std::vector<std::string> getColNames() const {
// Once getColNames called, colNames_ would be invalid
return std::move(colNames_);
}
Expand Down
9 changes: 5 additions & 4 deletions src/graph/LimitExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ void LimitExecutor::execute() {
FLOG_INFO("Executing Limit: %s", sentence_->toString().c_str());
if (inputs_ == nullptr || count_ == 0) {
DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
return;
}

auto ret = inputs_->getRows();
if (!ret.ok()) {
DCHECK(onFinish_);
onFinish_();
LOG(ERROR) << "Get rows failed: " << ret.status();
DCHECK(onError_);
onError_(std::move(ret).status());
return;
}
auto inRows = std::move(ret).value();
Expand All @@ -61,7 +62,7 @@ void LimitExecutor::execute() {
}

DCHECK(onFinish_);
onFinish_();
onFinish_(Executor::ProcessControl::kNext);
}


Expand Down
Loading

0 comments on commit 317203f

Please sign in to comment.