Skip to content

Commit

Permalink
Merge pull request #3 from F-Guardian/bzl-hparg
Browse files Browse the repository at this point in the history
stream execute test v1
  • Loading branch information
zhaojunnana authored Sep 4, 2023
2 parents 8bc5cfc + 44410ed commit 559b184
Show file tree
Hide file tree
Showing 13 changed files with 607 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
nebula_add_library(
executor_obj OBJECT
Executor.cpp
StreamExecutor.cpp
StorageAccessExecutor.cpp
stream/MockStartStreamExecutor.cpp
stream/MockTransportStreamExecutor.cpp
stream/StreamCollectExecutor.cpp
logic/LoopExecutor.cpp
logic/PassThroughExecutor.cpp
logic/StartExecutor.cpp
Expand Down
149 changes: 149 additions & 0 deletions src/graph/executor/StreamExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/StreamExecutor.h"
#include <iostream>
#include <utility>
#include "common/base/Status.h"
#include "graph/executor/stream/MockStartStreamExecutor.h"
#include "graph/executor/stream/MockTransportStreamExecutor.h"
#include "graph/executor/stream/StreamCollectExecutor.h"
#include "graph/planner/plan/PlanNode.h"

namespace nebula {
namespace graph {

// RoundResult
std::shared_ptr<DataSet> RoundResult::getOutputData() {
return output_;
}

bool RoundResult::hasNextRound() {
return hasNextRound_;
}

int RoundResult::getOffset() {
return offset_;
}

// static
StreamExecutor *StreamExecutor::createStream(const PlanNode *node, QueryContext *qctx) {
std::unordered_map<int64_t, StreamExecutor *> visited;
return makeStreamExecutor(node, qctx, &visited);
}

// static
StreamExecutor *StreamExecutor::makeStreamExecutor(const PlanNode *node, QueryContext *qctx,
std::unordered_map<int64_t, StreamExecutor *> *visited) {
DCHECK(qctx != nullptr);
DCHECK(node != nullptr);
auto iter = visited->find(node->id());
if (iter != visited->end()) {
return iter->second;
}

StreamExecutor *exec = makeStreamExecutor(qctx, node);

for (size_t i = 0; i < node->numDeps(); ++i) {
exec->dependsOn(makeStreamExecutor(node->dep(i), qctx, visited));
}

visited->insert({node->id(), exec});
return exec;
}

StreamExecutor *StreamExecutor::makeStreamExecutor(QueryContext *qctx, const PlanNode *node) {
auto pool = qctx->objPool();
// auto &spaceName = qctx->rctx() ? qctx->rctx()->session()->spaceName() : "";
switch (node->kind()) {
case PlanNode::Kind::kStart: {
return pool->makeAndAdd<MockStartStreamExecutor>(node, qctx);
}
case PlanNode::Kind::kIndexScan:
case PlanNode::Kind::kEdgeIndexFullScan:
case PlanNode::Kind::kEdgeIndexPrefixScan:
case PlanNode::Kind::kEdgeIndexRangeScan:
case PlanNode::Kind::kTagIndexFullScan:
case PlanNode::Kind::kTagIndexPrefixScan:
case PlanNode::Kind::kTagIndexRangeScan:
case PlanNode::Kind::kTraverse:
case PlanNode::Kind::kAppendVertices:
case PlanNode::Kind::kLimit: {
return pool->makeAndAdd<MockTransportStreamExecutor>(node, qctx);
}
case PlanNode::Kind::kProject: {
return pool->makeAndAdd<StreamCollectExecutor>(node, qctx);
}
case PlanNode::Kind::kUnknown: {
DLOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
}
default: {
DLOG(FATAL) << "Unsupported node kind yet " << static_cast<int32_t>(node->kind());
break;
}
}
return nullptr;
}

bool StreamExecutor::upStreamFinished() {
auto upStream = static_cast<int32_t>(depends_.size());
return upStreamFinishCount == upStream;
}

int32_t StreamExecutor::markSubmitTask() {
return ++taskCount;
}

int32_t StreamExecutor::markFinishTask(bool hasNextRound) {
int32_t currentTaskCount = --taskCount;
bool upStreamFinished = this->upStreamFinished();
DLOG(INFO) << "markFinishTask " << id()
<< " with upStreamFinished:" << upStreamFinished
<< ", hasNextRound:" << hasNextRound
<< ", currentTaskCount:" << currentTaskCount;
if (upStreamFinished && !hasNextRound && currentTaskCount == 0) {
for (auto next : successors_) {
static_cast<StreamExecutor*>(next)->markSubmitTask();
}
this->markFinishExecutor();
// flush once
for (auto next : successors_) {
static_cast<StreamExecutor*>(next)->markFinishTask(false);
}
}
return currentTaskCount;
}

void StreamExecutor::markFinishExecutor() {
DLOG(INFO) << "markFinishExecutor: " << id();
// mark executor finish
for (auto next : successors_) {
static_cast<StreamExecutor*>(next)->upStreamFinishCount++;
}
if (rootPromiseHasBeenSet_) {
rootPromise_.setValue(Status::OK());
}
}

void StreamExecutor::setRootPromise(folly::Promise<Status>&& rootPromise) {
rootPromise_ = std::move(rootPromise);
rootPromiseHasBeenSet_ = true;
}

folly::Future<Status> StreamExecutor::execute() {
return folly::makeFuture(Status::Error(
"Unsupported execute() in StreamExecutor, please use executeOneRound() instead."));
}

std::shared_ptr<RoundResult> StreamExecutor::executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) {
// TODO not yet, return output data set.
std::cout << input << ", " << offset;
return nullptr;
}


} // namespace graph
} // namespace nebula
74 changes: 74 additions & 0 deletions src/graph/executor/StreamExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_EXECUTOR_STREAMEXECUTOR_H_
#define GRAPH_EXECUTOR_STREAMEXECUTOR_H_

#include "graph/executor/Executor.h"

namespace nebula {
namespace graph {
class PlanNode;
class QueryContext;

class RoundResult {
public:
RoundResult() = default;
RoundResult(std::shared_ptr<DataSet> out, bool hasNextRound, int offset):
output_(out), hasNextRound_(hasNextRound), offset_(offset) {}
std::shared_ptr<DataSet> getOutputData();
bool hasNextRound();
int getOffset();

private:
std::shared_ptr<DataSet> output_{nullptr};
bool hasNextRound_;
int offset_;
};

class StreamExecutor : public Executor {
public:
// Create stream executor according to plan node
static StreamExecutor *createStream(const PlanNode *node, QueryContext *qctx);

folly::Future<Status> execute() override;

virtual std::shared_ptr<RoundResult> executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) = 0;

int32_t markSubmitTask();

int32_t markFinishTask(bool hasNextRound);

void setRootPromise(folly::Promise<Status>&& rootPromise);

protected:
static StreamExecutor *makeStreamExecutor(const PlanNode *node,
QueryContext *qctx,
std::unordered_map<int64_t, StreamExecutor *> *visited);

static StreamExecutor *makeStreamExecutor(QueryContext *qctx, const PlanNode *node);

StreamExecutor(const std::string &name, const PlanNode *node, QueryContext *qctx)
: Executor(name, node, qctx) {}

virtual void markFinishExecutor();

private:
bool upStreamFinished();

protected:
folly::Promise<Status> rootPromise_;
bool rootPromiseHasBeenSet_ = false;
const int32_t batch = 10;

private:
std::atomic_int32_t taskCount = 0;
std::atomic_int32_t upStreamFinishCount = 0;
};

} // namespace graph
} // namespace nebula

#endif // GRAPH_EXECUTOR_STREAMEXECUTOR_H_
33 changes: 33 additions & 0 deletions src/graph/executor/stream/MockStartStreamExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/stream/MockStartStreamExecutor.h"
#include <iostream>
#include <memory>
#include <vector>

namespace nebula {
using Row = List;

namespace graph {

std::shared_ptr<RoundResult> MockStartStreamExecutor::executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) {
std::cout << "input: " << input << ", offset: " << offset << std::endl;
if (offset < 0) {
offset = 0;
}
auto ds = std::make_shared<nebula::DataSet>();
std::vector<std::string> colNames = {"var1", "var2", "var3"};
ds->colNames = std::move(colNames);
Row row;
row.emplace_back(Value(offset));
row.emplace_back(Value("hello"));
row.emplace_back(Value(1.23));
ds->rows.emplace_back(std::move(row));
return std::make_shared<RoundResult>(ds, offset < 3, offset+1);
}

} // namespace graph
} // namespace nebula
28 changes: 28 additions & 0 deletions src/graph/executor/stream/MockStartStreamExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_EXECUTOR_STREAM_MOCKSTARTSTREAMEXECUTOR_H_
#define GRAPH_EXECUTOR_STREAM_MOCKSTARTSTREAMEXECUTOR_H_

#include "graph/executor/StreamExecutor.h"
#include "graph/planner/plan/Query.h"
// used in lookup and match scenarios.
// fetch data from storage layer, according to the index selected by the optimizer.
namespace nebula {
namespace graph {

class MockStartStreamExecutor final : public StreamExecutor {
public:
MockStartStreamExecutor(const PlanNode *node, QueryContext *qctx)
: StreamExecutor("MockStartStreamExecutor", node, qctx) {
}

std::shared_ptr<RoundResult> executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) override;
};

} // namespace graph
} // namespace nebula

#endif // GRAPH_EXECUTOR_STREAM_MOCKSTARTSTREAMEXECUTOR_H_
17 changes: 17 additions & 0 deletions src/graph/executor/stream/MockTransportStreamExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/stream/MockTransportStreamExecutor.h"

namespace nebula {
namespace graph {

std::shared_ptr<RoundResult> MockTransportStreamExecutor::executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) {
std::cout << "input: " << input << ", offset: " << offset << std::endl;
return std::make_shared<RoundResult>(input, false, -1);
}

} // namespace graph
} // namespace nebula
28 changes: 28 additions & 0 deletions src/graph/executor/stream/MockTransportStreamExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#ifndef GRAPH_EXECUTOR_STREAM_MOCKTRANSPORTSTREAMEXECUTOR_H_
#define GRAPH_EXECUTOR_STREAM_MOCKTRANSPORTSTREAMEXECUTOR_H_

#include "graph/executor/StreamExecutor.h"
#include "graph/planner/plan/Query.h"
// used in lookup and match scenarios.
// fetch data from storage layer, according to the index selected by the optimizer.
namespace nebula {
namespace graph {

class MockTransportStreamExecutor final : public StreamExecutor {
public:
MockTransportStreamExecutor(const PlanNode *node, QueryContext *qctx)
: StreamExecutor("MockTransportStreamExecutor", node, qctx) {
}

std::shared_ptr<RoundResult> executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) override;
};

} // namespace graph
} // namespace nebula

#endif // GRAPH_EXECUTOR_STREAM_MOCKTRANSPORTSTREAMEXECUTOR_H_
29 changes: 29 additions & 0 deletions src/graph/executor/stream/StreamCollectExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2020 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.

#include "graph/executor/stream/StreamCollectExecutor.h"

namespace nebula {
namespace graph {

std::shared_ptr<RoundResult> StreamCollectExecutor::executeOneRound(
std::shared_ptr<DataSet> input, int64_t offset) {
std::cout << "input: " << input << ", offset: " << offset << std::endl;
DLOG(INFO) << "StreamCollectExecutor executeOneRound.";
std::lock_guard<std::mutex> lock(mutex_);
finalDataSet_.append(std::move(*input));
DLOG(INFO) << "finalDataSet_ appended.";
return std::make_shared<RoundResult>(nullptr, false, -1);
}

void StreamCollectExecutor::markFinishExecutor() {
DLOG(INFO) << "StreamCollectExecutor markFinishExecutor.";
auto status = finish(ResultBuilder().value(Value(std::move(finalDataSet_))).build());
if (rootPromiseHasBeenSet_) {
rootPromise_.setValue(status);
}
}

} // namespace graph
} // namespace nebula
Loading

0 comments on commit 559b184

Please sign in to comment.