diff --git a/conf/CMakeLists.txt b/conf/CMakeLists.txt index 4e7e2cbe85b..409b3cc9140 100644 --- a/conf/CMakeLists.txt +++ b/conf/CMakeLists.txt @@ -1 +1,4 @@ -install(FILES nebula-graphd.conf.default DESTINATION etc) +install( + FILES nebula-graphd.conf.default nebula-storaged.conf.default nebula-metad.conf.default + DESTINATION etc +) diff --git a/conf/nebula-graphd.conf.default b/conf/nebula-graphd.conf.default index b30209bb644..9a4d4d6eb10 100644 --- a/conf/nebula-graphd.conf.default +++ b/conf/nebula-graphd.conf.default @@ -1,38 +1,42 @@ ########## basics ########## -# Whether run as a daemon process +# Whether to run as a daemon process --daemonize=true +# The file to host the process id +--pid_file=pids/nebula-graphd.pid ########## logging ########## -# Directory to host logging files, which must already exist +# The directory to host logging files, which must already exists --log_dir=logs -# 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively +# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively --minloglevel=0 -# verbose loging level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging +# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging --v=4 -# maximum seconds to buffer the log messages +# Maximum seconds to buffer the log messages --logbufsecs=0 # Whether to redirect stdout and stderr to separate output files --redirect_stdout=true # Destination filename of stdout and stderr, which will also reside in log_dir. --stdout_log_file=stdout.log --stderr_log_file=stderr.log -# File to host the process id, which also resides in log_dir ---pid_file=nebula-graphd.pid ########## networking ########## # Network device to listen on --listen_netdev=any # Port to listen on --port=3699 -# seconds before we close the idle connections, 0 for infinite +# To turn on SO_REUSEPORT or not +--reuse_port=false +# Backlog of the listen socket, adjust this together with net.core.somaxconn +--listen_backlog=1024 +# Seconds before the idle connections are closed, 0 for never closed --client_idle_timeout_secs=0 -# seconds before we expire the idle sessions, 0 for inifnite +# Seconds before the idle sessions are expired, 0 for no expiration --session_idle_timeout_secs=60000 -# number of threads to accept incoming connections +# The number of threads to accept incoming connections --num_accept_threads=1 -# number of networking IO threads, 0 for number of physical CPU cores +# The number of networking IO threads, 0 for number of physical CPU cores --num_netio_threads=0 -# turn on SO_REUSEPORT or not ---reuse_port=false -# Backlog of the listen socket, adjust this together with net.core.somaxconn ---listen_backlog=1024 +# HTTP service port +--ws_http_port=0 +# HTTP2 service port +--ws_h2_portt=0 diff --git a/src/console/CMakeLists.txt b/src/console/CMakeLists.txt index 650c1a81d3f..c2955f5df96 100644 --- a/src/console/CMakeLists.txt +++ b/src/console/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable( $ $ $ + $ $ $ $ diff --git a/src/console/CliManager.cpp b/src/console/CliManager.cpp index 85b7d984751..c3ad370f9b8 100644 --- a/src/console/CliManager.cpp +++ b/src/console/CliManager.cpp @@ -10,6 +10,9 @@ #include "console/CliManager.h" #include "client/cpp/GraphClient.h" +DECLARE_string(u); +DECLARE_string(p); + namespace nebula { namespace graph { @@ -18,6 +21,10 @@ const int32_t kMaxUsernameLen = 16; const int32_t kMaxPasswordLen = 24; const int32_t kMaxCommandLineLen = 1024; +CliManager::CliManager() { + ::using_history(); +} + bool CliManager::connect(const std::string& addr, uint16_t port, @@ -32,11 +39,15 @@ bool CliManager::connect(const std::string& addr, pass[kMaxPasswordLen] = '\0'; // Make sure username is not empty - for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(user); i++) { - // Need to interactively get the username - std::cout << "Username: "; - std::cin.getline(user, kMaxUsernameLen); - user[kMaxUsernameLen] = '\0'; + if (FLAGS_u.empty()) { + for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(user); i++) { + // Need to interactively get the username + std::cout << "Username: "; + std::cin.getline(user, kMaxUsernameLen); + user[kMaxUsernameLen] = '\0'; + } + } else { + strcpy(user, FLAGS_u.c_str()); // NOLINT } if (!strlen(user)) { std::cout << "Authentication failed: " @@ -45,11 +56,15 @@ bool CliManager::connect(const std::string& addr, } // Make sure password is not empty - for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(pass); i++) { - // Need to interactively get the password - std::cout << "Password: "; - std::cin.getline(pass, kMaxPasswordLen); - pass[kMaxPasswordLen] = '\0'; + if (FLAGS_p.empty()) { + for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(pass); i++) { + // Need to interactively get the password + std::cout << "Password: "; + std::cin.getline(pass, kMaxPasswordLen); + pass[kMaxPasswordLen] = '\0'; + } + } else { + strcpy(pass, FLAGS_p.c_str()); // NOLINT } if (!strlen(pass)) { std::cout << "Authentication failed: " @@ -85,26 +100,52 @@ void CliManager::loop() { std::string cmd; loadHistory(); while (true) { - if (!readLine(cmd)) { + std::string line; + if (!readLine(line, !cmd.empty())) { break; } - if (cmd.empty()) { + if (line.empty()) { + cmd.clear(); + continue; + } + + if (line.back() == '\\') { + line.resize(line.size() - 1); + if (cmd.empty()) { + cmd = line; + } else if (cmd.back() == ' ') { + cmd += line; + } else { + cmd = cmd + " " + line; + } continue; } + + cmd += line; + if (!cmdProcessor_->process(cmd)) { break; } + cmd.clear(); } saveHistory(); } -bool CliManager::readLine(std::string &line) { +bool CliManager::readLine(std::string &line, bool linebreak) { auto ok = true; char prompt[256]; static auto color = 0u; - ::snprintf(prompt, sizeof(prompt), "\033[1;%umnebula> \033[0m", color++ % 6 + 31); - auto *input = ::readline(prompt); + ::snprintf(prompt, sizeof(prompt), + "\001" // RL_PROMPT_START_IGNORE + "\033[1;%um" // color codes start + "\002" // RL_PROMPT_END_IGNORE + "nebula> " // prompt + "\001" // RL_PROMPT_START_IGNORE + "\033[0m" // restore color code + "\002", // RL_PROMPT_END_IGNORE + color++ % 6 + 31); + auto *input = ::readline(linebreak ? "": prompt); do { // EOF diff --git a/src/console/CliManager.h b/src/console/CliManager.h index fac2b8bcee0..80df2714f91 100644 --- a/src/console/CliManager.h +++ b/src/console/CliManager.h @@ -15,7 +15,7 @@ namespace graph { class CliManager final { public: - CliManager() = default; + CliManager(); ~CliManager() = default; bool connect(const std::string& addr, @@ -27,7 +27,7 @@ class CliManager final { void loop(); - bool readLine(std::string &line); + bool readLine(std::string &line, bool linebreak = false); void updateHistory(const char *line); diff --git a/src/console/CmdProcessor.cpp b/src/console/CmdProcessor.cpp index ecd25fa4aa5..24e512d1fd6 100644 --- a/src/console/CmdProcessor.cpp +++ b/src/console/CmdProcessor.cpp @@ -347,12 +347,12 @@ void CmdProcessor::processServerCmd(folly::StringPiece cmd) { if (std::regex_search(*msg, result, range)) { auto start = folly::to(result[1].str()); auto end = folly::to(result[2].str()); - verbose = "syntax error near `" + std::string(&cmd[start-1], &cmd[end]) + "'"; + verbose = "syntax error near `" + std::string(&cmd[start-1], end - start + 1) + "'"; } else if (std::regex_search(*msg, result, single)) { auto start = folly::to(result[1].str()); auto end = start + 8; end = end > cmd.size() ? cmd.size() : end; - verbose = "syntax error near `" + std::string(&cmd[start-1], &cmd[end]) + "'"; + verbose = "syntax error near `" + std::string(&cmd[start-1], end - start + 1) + "'"; } std::cout << "[ERROR (" << static_cast(res) << ")]: " << verbose << "\n"; diff --git a/src/console/NebulaConsole.cpp b/src/console/NebulaConsole.cpp index 81a0aa13105..696cb7c61cf 100644 --- a/src/console/NebulaConsole.cpp +++ b/src/console/NebulaConsole.cpp @@ -6,24 +6,42 @@ #include "base/Base.h" #include "console/CliManager.h" +#include "fs/FileUtils.h" DEFINE_string(addr, "127.0.0.1", "Nebula daemon IP address"); -DEFINE_int32(port, 34500, "Nebula daemon listening port"); -DEFINE_string(username, "", "Username used to authenticate"); -DEFINE_string(password, "", "Password used to authenticate"); +DEFINE_int32(port, 0, "Nebula daemon listening port"); +DEFINE_string(u, "", "Username used to authenticate"); +DEFINE_string(p, "", "Password used to authenticate"); int main(int argc, char *argv[]) { folly::init(&argc, &argv, true); using nebula::graph::CliManager; + using nebula::fs::FileUtils; + if (FLAGS_port == 0) { + // If port not provided, we use the one in etc/nebula-graphd.conf + auto path = FileUtils::readLink("/proc/self/exe").value(); + auto dir = FileUtils::dirname(path.c_str()); + static const std::regex pattern("--port=([0-9]+)"); + FileUtils::FileLineIterator iter(dir + "/../etc/nebula-graphd.conf", &pattern); + if (iter.valid()) { + auto &smatch = iter.matched(); + FLAGS_port = folly::to(smatch[1].str()); + } + } + if (FLAGS_port == 0) { + fprintf(stderr, "--port must be specified\n"); + return EXIT_FAILURE; + } CliManager cli; - if (!cli.connect(FLAGS_addr, FLAGS_port, FLAGS_username, FLAGS_password)) { - exit(1); + if (!cli.connect(FLAGS_addr, FLAGS_port, FLAGS_u, FLAGS_p)) { + return EXIT_FAILURE; } cli.loop(); + return EXIT_SUCCESS; } diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 4a5e864ce66..ee3b187a5a9 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -8,6 +8,9 @@ add_executable( $ $ $ + $ + $ + $ $ $ $ @@ -18,6 +21,7 @@ add_executable( $ $ $ + $ ) nebula_link_libraries( nebula-graphd @@ -26,7 +30,6 @@ nebula_link_libraries( ${THRIFT_LIBRARIES} wangle ) -install(TARGETS nebula-graphd DESTINATION bin) add_executable( @@ -78,6 +81,7 @@ add_executable( $ $ $ + $ $ ) nebula_link_libraries( @@ -88,4 +92,9 @@ nebula_link_libraries( ${THRIFT_LIBRARIES} wangle ) -install(TARGETS nebula-metad DESTINATION bin) + + +install( + TARGETS nebula-graphd nebula-storaged nebula-metad + DESTINATION bin +) diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index d9747bbb61c..92d2ad9e80e 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -54,6 +54,12 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } + if (FLAGS_daemonize) { + google::SetStderrLogging(google::FATAL); + } else { + google::SetStderrLogging(google::INFO); + } + // Setup logging auto status = setupLogging(); if (!status.ok()) { @@ -62,7 +68,7 @@ int main(int argc, char *argv[]) { } // Detect if the server has already been started - auto pidPath = FLAGS_log_dir + "/" + FLAGS_pid_file; + auto pidPath = FLAGS_pid_file; status = ProcessUtils::isPidAvailable(pidPath); if (!status.ok()) { LOG(ERROR) << status; @@ -84,13 +90,6 @@ int main(int argc, char *argv[]) { } } - // Setup the signal handlers - status = setupSignalHandler(); - if (!status.ok()) { - LOG(ERROR) << status; - return EXIT_FAILURE; - } - LOG(INFO) << "Starting Graph HTTP Service"; nebula::WebService::registerHandler("/graph", [] { return new nebula::graph::GraphHttpHandler(); @@ -112,10 +111,10 @@ int main(int argc, char *argv[]) { localIP = std::move(result).value(); } - auto interface = std::make_shared(); gServer = std::make_unique(); + auto interface = std::make_shared(gServer->getIOThreadPool()); - gServer->setInterface(interface); + gServer->setInterface(std::move(interface)); gServer->setAddress(localIP, FLAGS_port); gServer->setReusePort(FLAGS_reuse_port); gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs)); @@ -131,6 +130,13 @@ int main(int argc, char *argv[]) { gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads); } + // Setup the signal handlers + status = setupSignalHandler(); + if (!status.ok()) { + LOG(ERROR) << status; + return EXIT_FAILURE; + } + FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port); try { gServer->serve(); // Blocking wait until shut down via gServer->stop() diff --git a/src/executor/AssignmentExecutor.cpp b/src/executor/AssignmentExecutor.cpp new file mode 100644 index 00000000000..c456cb979d5 --- /dev/null +++ b/src/executor/AssignmentExecutor.cpp @@ -0,0 +1,56 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include "graph/AssignmentExecutor.h" +#include "graph/TraverseExecutor.h" + + +namespace nebula { +namespace graph { + + +AssignmentExecutor::AssignmentExecutor(Sentence *sentence, + ExecutionContext *ectx) : Executor(ectx) { + sentence_ = static_cast(sentence); +} + + +Status AssignmentExecutor::prepare() { + var_ = sentence_->var(); + executor_ = TraverseExecutor::makeTraverseExecutor(sentence_->sentence(), ectx()); + auto status = executor_->prepare(); + if (!status.ok()) { + FLOG_ERROR("Prepare executor `%s' failed: %s", + executor_->name(), status.toString().c_str()); + return status; + } + auto onError = [this] (Status s) { + DCHECK(onError_); + onError_(std::move(s)); + }; + auto onFinish = [this] () { + DCHECK(onFinish_); + onFinish_(); + }; + auto onResult = [this] (std::unique_ptr result) { + ectx()->variableHolder()->add(*var_, std::move(result)); + }; + executor_->setOnError(onError); + executor_->setOnFinish(onFinish); + executor_->setOnResult(onResult); + + return Status::OK(); +} + + +void AssignmentExecutor::execute() { + executor_->execute(); +} + + +} // namespace graph +} // namespace nebula diff --git a/src/executor/AssignmentExecutor.h b/src/executor/AssignmentExecutor.h new file mode 100644 index 00000000000..5c4ae03cbda --- /dev/null +++ b/src/executor/AssignmentExecutor.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef GRAPH_ASSIGNMENTEXECUTOR_H_ +#define GRAPH_ASSIGNMENTEXECUTOR_H_ + +#include "base/Base.h" +#include "graph/Executor.h" + +namespace nebula { +namespace graph { + +class TraverseExecutor; +class AssignmentExecutor final : public Executor { +public: + AssignmentExecutor(Sentence *sentence, ExecutionContext *ectx); + + const char* name() const override { + return "AssignmentExecutor"; + } + + Status MUST_USE_RESULT prepare() override; + + void execute() override; + +private: + AssignmentSentence *sentence_{nullptr}; + std::unique_ptr executor_; + std::string *var_; +}; + + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_ASSIGNMENTEXECUTOR_H_ diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt index 5735dc21712..faeddcad944 100644 --- a/src/executor/CMakeLists.txt +++ b/src/executor/CMakeLists.txt @@ -23,7 +23,10 @@ add_library( DescribeEdgeExecutor.cpp InsertVertexExecutor.cpp InsertEdgeExecutor.cpp + AssignmentExecutor.cpp ShowExecutor.cpp + InterimResult.cpp + VariableHolder.cpp mock/PropertiesSchema.cpp mock/EdgeSchema.cpp mock/TagSchema.cpp diff --git a/src/executor/ClientSession.h b/src/executor/ClientSession.h index 246109916c3..fe369dd07fd 100644 --- a/src/executor/ClientSession.h +++ b/src/executor/ClientSession.h @@ -27,12 +27,17 @@ class ClientSession final { id_ = id; } - const std::string& space() const { + GraphSpaceID space() const { return space_; } - void setSpace(std::string space) { - space_ = std::move(space); + void setSpace(const std::string &name, GraphSpaceID space) { + spaceName_ = name; + space_ = space; + } + + const std::string& spaceName() const { + return spaceName_; } uint64_t idleSeconds() const; @@ -58,8 +63,9 @@ class ClientSession final { private: int64_t id_{0}; + GraphSpaceID space_{0}; time::Duration idleDuration_; - std::string space_; + std::string spaceName_; std::string user_; }; diff --git a/src/executor/DescribeEdgeExecutor.cpp b/src/executor/DescribeEdgeExecutor.cpp index 4f0327fb6e6..d53319f493b 100644 --- a/src/executor/DescribeEdgeExecutor.cpp +++ b/src/executor/DescribeEdgeExecutor.cpp @@ -25,8 +25,8 @@ Status DescribeEdgeExecutor::prepare() { void DescribeEdgeExecutor::execute() { auto *name = sentence_->name(); auto space = ectx()->rctx()->session()->space(); - - auto schema = meta::SchemaManager::getEdgeSchema(space, *name); + auto edgeType = meta::SchemaManager::toEdgeType(*name); + auto schema = meta::SchemaManager::getEdgeSchema(space, edgeType); resp_ = std::make_unique(); do { diff --git a/src/executor/DescribeTagExecutor.cpp b/src/executor/DescribeTagExecutor.cpp index 131d5e44297..82e56dc80f7 100644 --- a/src/executor/DescribeTagExecutor.cpp +++ b/src/executor/DescribeTagExecutor.cpp @@ -25,7 +25,8 @@ Status DescribeTagExecutor::prepare() { void DescribeTagExecutor::execute() { auto *name = sentence_->name(); auto space = ectx()->rctx()->session()->space(); - auto schema = meta::SchemaManager::getTagSchema(space, *name); + auto tagId = meta::SchemaManager::toTagID(*name); + auto schema = meta::SchemaManager::getTagSchema(space, tagId); resp_ = std::make_unique(); diff --git a/src/executor/ExecutionContext.h b/src/executor/ExecutionContext.h index 8d734a195ea..04c0b11da09 100644 --- a/src/executor/ExecutionContext.h +++ b/src/executor/ExecutionContext.h @@ -12,22 +12,26 @@ #include "graph/RequestContext.h" #include "parser/SequentialSentences.h" #include "graph/mock/SchemaManager.h" -#include "graph/mock/StorageService.h" +#include "graph/VariableHolder.h" /** * ExecutionContext holds context infos in the execution process, e.g. clients of storage or meta services. */ namespace nebula { +namespace storage { +class StorageClient; +} // namespace storage namespace graph { class ExecutionContext final : public cpp::NonCopyable, public cpp::NonMovable { public: using RequestContextPtr = std::unique_ptr>; - ExecutionContext(RequestContextPtr rctx, SchemaManager *sm, StorageService *storage) { + ExecutionContext(RequestContextPtr rctx, SchemaManager *sm, storage::StorageClient *storage) { rctx_ = std::move(rctx); sm_ = sm; storage_ = storage; + variableHolder_ = std::make_unique(); } ~ExecutionContext() = default; @@ -40,14 +44,19 @@ class ExecutionContext final : public cpp::NonCopyable, public cpp::NonMovable { return sm_; } - StorageService* storage() const { + storage::StorageClient* storage() const { return storage_; } + VariableHolder* variableHolder() const { + return variableHolder_.get(); + } + private: RequestContextPtr rctx_; SchemaManager *sm_{nullptr}; - StorageService *storage_{nullptr}; + storage::StorageClient *storage_{nullptr}; + std::unique_ptr variableHolder_; }; } // namespace graph diff --git a/src/executor/ExecutionEngine.cpp b/src/executor/ExecutionEngine.cpp index 7acd86473b6..676c9f7926d 100644 --- a/src/executor/ExecutionEngine.cpp +++ b/src/executor/ExecutionEngine.cpp @@ -8,13 +8,14 @@ #include "graph/ExecutionEngine.h" #include "graph/ExecutionContext.h" #include "graph/ExecutionPlan.h" +#include "storage/client/StorageClient.h" namespace nebula { namespace graph { -ExecutionEngine::ExecutionEngine() { +ExecutionEngine::ExecutionEngine(std::unique_ptr storage) { schemaManager_ = std::make_unique(); - storage_ = std::make_unique(schemaManager_.get()); + storage_ = std::move(storage); } @@ -26,7 +27,7 @@ void ExecutionEngine::execute(RequestContextPtr rctx) { auto ectx = std::make_unique(std::move(rctx), schemaManager_.get(), storage_.get()); - // TODO(dutor) add support to execution plan + // TODO(dutor) add support to plan cache auto plan = new ExecutionPlan(std::move(ectx)); plan->execute(); diff --git a/src/executor/ExecutionEngine.h b/src/executor/ExecutionEngine.h index a515d7f02f4..fe25d5b6282 100644 --- a/src/executor/ExecutionEngine.h +++ b/src/executor/ExecutionEngine.h @@ -12,7 +12,6 @@ #include "graph/RequestContext.h" #include "gen-cpp2/GraphService.h" #include "graph/mock/SchemaManager.h" -#include "graph/mock/StorageService.h" /** * ExecutinoEngine is responsible to create and manage ExecutionPlan. @@ -21,11 +20,15 @@ */ namespace nebula { +namespace storage { +class StorageClient; +} // namespace storage + namespace graph { class ExecutionEngine final : public cpp::NonCopyable, public cpp::NonMovable { public: - ExecutionEngine(); + explicit ExecutionEngine(std::unique_ptr storage); ~ExecutionEngine(); using RequestContextPtr = std::unique_ptr>; @@ -33,7 +36,7 @@ class ExecutionEngine final : public cpp::NonCopyable, public cpp::NonMovable { private: std::unique_ptr schemaManager_; - std::unique_ptr storage_; + std::unique_ptr storage_; }; } // namespace graph diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 6dfe05f24c1..1a13d57b282 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -22,6 +22,7 @@ #include "graph/InsertVertexExecutor.h" #include "graph/InsertEdgeExecutor.h" #include "graph/ShowExecutor.h" +#include "graph/AssignmentExecutor.h" namespace nebula { namespace graph { @@ -66,6 +67,9 @@ std::unique_ptr Executor::makeExecutor(Sentence *sentence) { case Sentence::Kind::kShow: executor = std::make_unique(sentence, ectx()); break; + case Sentence::Kind::kAssignment: + executor = std::make_unique(sentence, ectx()); + break; case Sentence::Kind::kUnknown: LOG(FATAL) << "Sentence kind unknown"; break; diff --git a/src/executor/Executor.h b/src/executor/Executor.h index dae7a007ebb..2d19f427783 100644 --- a/src/executor/Executor.h +++ b/src/executor/Executor.h @@ -11,6 +11,7 @@ #include "base/Status.h" #include "cpp/helpers.h" #include "graph/ExecutionContext.h" +#include "gen-cpp2/common_types.h" /** diff --git a/src/executor/GoExecutor.cpp b/src/executor/GoExecutor.cpp index 98c1b9bdf8d..5f900489c30 100644 --- a/src/executor/GoExecutor.cpp +++ b/src/executor/GoExecutor.cpp @@ -6,7 +6,10 @@ #include "base/Base.h" #include "graph/GoExecutor.h" - +#include "meta/SchemaManager.h" +#include "dataman/RowReader.h" +#include "dataman/RowSetReader.h" +#include "dataman/ResultSchemaProvider.h" namespace nebula { namespace graph { @@ -21,7 +24,7 @@ GoExecutor::GoExecutor(Sentence *sentence, ExecutionContext *ectx) : TraverseExe Status GoExecutor::prepare() { DCHECK(sentence_ != nullptr); Status status; - expctx_ = std::make_unique(); + expCtx_ = std::make_unique(); do { status = prepareStep(); if (!status.ok()) { @@ -43,10 +46,6 @@ Status GoExecutor::prepare() { if (!status.ok()) { break; } - status = prepareResultSchema(); - if (!status.ok()) { - break; - } status = prepareNeededProps(); if (!status.ok()) { break; @@ -57,53 +56,27 @@ Status GoExecutor::prepare() { return status; } - expctx_->print(); - - if (!onResult_) { - onResult_ = [this] (TraverseRecords records) { - this->cacheResult(std::move(records)); - }; - } - return status; } void GoExecutor::execute() { FLOG_INFO("Executing Go: %s", sentence_->toString().c_str()); - using Result = StatusOr; - auto eprops = expctx_->edgePropNames(); - auto vprops = expctx_->srcNodePropNames(); - auto future = ectx()->storage()->getOutBound(starts_, edge_, eprops, vprops); - auto *runner = ectx()->rctx()->runner(); - std::move(future).via(runner).then([this] (Result result) { - if (!result.ok()) { - auto &resp = ectx()->rctx()->resp(); - auto status = result.status(); - resp.set_error_code(cpp2::ErrorCode::E_EXECUTION_ERROR); - resp.set_error_msg(status.toString()); - } - if (onFinish_) { - onFinish_(); - } - }); -} - - -void GoExecutor::feedResult(TraverseRecords records) { - inputs_.reserve(records.size()); - for (auto &record : records) { - inputs_.push_back(std::move(record)); + auto status = setupStarts(); + if (!status.ok()) { + onError_(std::move(status)); + return; } + if (starts_.empty()) { + onEmptyInputs(); + return; + } + stepOut(); } -Status GoExecutor::prepareResultSchema() { - resultSchema_ = std::make_unique(); - for (auto *column : yields_) { - resultSchema_->addColumn(*column->alias()); - } - return Status::OK(); +void GoExecutor::feedResult(std::unique_ptr result) { + inputs_ = std::move(result); } @@ -113,6 +86,11 @@ Status GoExecutor::prepareStep() { steps_ = clause->steps(); upto_ = clause->isUpto(); } + + if (isUpto()) { + return Status::Error("`UPTO' not supported yet"); + } + return Status::OK(); } @@ -122,14 +100,24 @@ Status GoExecutor::prepareFrom() { auto *clause = sentence_->fromClause(); do { if (clause == nullptr) { - break; + LOG(FATAL) << "From clause shall never be null"; } - auto *alias = clause->alias(); - if (alias == nullptr) { - break; + if (!clause->isRef()) { + starts_ = clause->srcNodeList()->nodeIds(); + } else { + auto *expr = clause->ref(); + if (expr->isInputExpression()) { + auto *iexpr = static_cast(expr); + colname_ = iexpr->prop(); + } else if (expr->isVariableExpression()) { + auto *vexpr = static_cast(expr); + varname_ = vexpr->var(); + colname_ = vexpr->prop(); + } else { + // No way to happen except memory corruption + LOG(FATAL) << "Unknown kind of expression"; + } } - expctx_->addAlias(*alias, AliasKind::SourceNode); - starts_ = clause->srcNodeList()->nodeIds(); } while (false); return status; } @@ -140,14 +128,21 @@ Status GoExecutor::prepareOver() { auto *clause = sentence_->overClause(); do { if (clause == nullptr) { - break; + LOG(FATAL) << "Over clause shall never be null"; } - edge_ = clause->edge(); + edge_ = meta::SchemaManager::toEdgeType(*clause->edge()); reversely_ = clause->isReversely(); if (clause->alias() != nullptr) { - expctx_->addAlias(*clause->alias(), AliasKind::Edge, *clause->edge()); + expCtx_->addAlias(*clause->alias(), AliasKind::Edge, *clause->edge()); + } else { + expCtx_->addAlias(*clause->edge(), AliasKind::Edge, *clause->edge()); } } while (false); + + if (isReversely()) { + return Status::Error("`REVERSELY' not supported yet"); + } + return status; } @@ -174,7 +169,7 @@ Status GoExecutor::prepareNeededProps() { auto status = Status::OK(); do { if (filter_ != nullptr) { - filter_->setContext(expctx_.get()); + filter_->setContext(expCtx_.get()); status = filter_->prepare(); if (!status.ok()) { break; @@ -184,25 +179,492 @@ Status GoExecutor::prepareNeededProps() { break; } for (auto *col : yields_) { - col->expr()->setContext(expctx_.get()); + col->expr()->setContext(expCtx_.get()); status = col->expr()->prepare(); if (!status.ok()) { break; } } + if (!status.ok()) { + break; + } } while (false); return status; } -void GoExecutor::cacheResult(TraverseRecords records) { - UNUSED(records); +Status GoExecutor::setupStarts() { + // Literal vertex ids + if (!starts_.empty()) { + return Status::OK(); + } + // Take one column from a variable + if (varname_ != nullptr) { + auto *varinput = ectx()->variableHolder()->get(*varname_); + if (varinput == nullptr) { + return Status::Error("Variable `%s' not defined", varname_->c_str()); + } + starts_ = varinput->getVIDs(*colname_); + return Status::OK(); + } + // No error happened, but we are having empty inputs + if (inputs_ == nullptr) { + return Status::OK(); + } + // Take one column from the input of the pipe + DCHECK(colname_ != nullptr); + starts_ = inputs_->getVIDs(*colname_); + return Status::OK(); } void GoExecutor::setupResponse(cpp2::ExecutionResponse &resp) { - UNUSED(resp); + if (resp_ == nullptr) { + resp_ = std::make_unique(); + } + resp = std::move(*resp_); +} + + +void GoExecutor::stepOut() { + auto space = ectx()->rctx()->session()->space(); + auto returns = getStepOutProps(); + auto future = ectx()->storage()->getNeighbors(space, + starts_, + edge_, + !reversely_, + "", + std::move(returns)); + auto *runner = ectx()->rctx()->runner(); + auto cb = [this] (auto &&result) { + auto completeness = result.completeness(); + if (completeness == 0) { + DCHECK(onError_); + onError_(Status::Error("Get neighbors failed")); + return; + } else if (completeness != 100) { + // TODO(dutor) We ought to let the user know that the execution was partially + // performed, even in the case that this happened in the intermediate process. + // Or, make this case configurable at runtime. + // For now, we just do some logging and keep going. + LOG(INFO) << "Get neighbors partially failed: " << completeness << "%"; + for (auto &error : result.failedParts()) { + LOG(ERROR) << "part: " << error.first + << "error code: " << static_cast(error.second); + } + } + onStepOutResponse(std::move(result)); + }; + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + onError_(Status::Error("Internal error")); + }; + std::move(future).via(runner).thenValue(cb).thenError(error); +} + + +void GoExecutor::onStepOutResponse(RpcResponse &&rpcResp) { + if (isFinalStep()) { + if (expCtx_->hasDstTagProp()) { + auto dstids = getDstIdsFromResp(rpcResp); + if (dstids.empty()) { + onEmptyInputs(); + return; + } + fetchVertexProps(std::move(dstids), std::move(rpcResp)); + return; + } + finishExecution(std::move(rpcResp)); + return; + } else { + curStep_++; + starts_ = getDstIdsFromResp(rpcResp); + if (starts_.empty()) { + onEmptyInputs(); + return; + } + stepOut(); + } +} + + +void GoExecutor::onVertexProps(RpcResponse &&rpcResp) { + UNUSED(rpcResp); +} + + +std::vector GoExecutor::getDstIdsFromResp(RpcResponse &rpcResp) const { + std::unordered_set set; + for (auto &resp : rpcResp.responses()) { + auto *vertices = resp.get_vertices(); + if (vertices == nullptr) { + continue; + } + auto schema = std::make_shared(resp.edge_schema); + for (auto &vdata : *vertices) { + RowSetReader rsReader(schema, vdata.edge_data); + auto iter = rsReader.begin(); + while (iter) { + VertexID dst; + auto rc = iter->getVid("_dst", dst); + CHECK(rc == ResultType::SUCCEEDED); + set.emplace(dst); + ++iter; + } + } + } + return std::vector(set.begin(), set.end()); +} + + +void GoExecutor::finishExecution(RpcResponse &&rpcResp) { + if (onResult_) { + onResult_(setupInterimResult(std::move(rpcResp))); + } else { + resp_ = std::make_unique(); + setupResponseHeader(*resp_); + setupResponseBody(rpcResp, *resp_); + } + DCHECK(onFinish_); + onFinish_(); +} + + +std::vector GoExecutor::getStepOutProps() const { + std::vector props; + { + storage::cpp2::PropDef pd; + pd.owner = storage::cpp2::PropOwner::EDGE; + pd.name = "_dst"; + props.emplace_back(std::move(pd)); + } + + if (!isFinalStep()) { + return props; + } + + for (auto &tagProp : expCtx_->srcTagProps()) { + storage::cpp2::PropDef pd; + pd.owner = storage::cpp2::PropOwner::SOURCE; + pd.name = tagProp.second; + auto tagId = meta::SchemaManager::toTagID(tagProp.first); + pd.set_tag_id(tagId); + props.emplace_back(std::move(pd)); + } + for (auto &prop : expCtx_->edgeProps()) { + storage::cpp2::PropDef pd; + pd.owner = storage::cpp2::PropOwner::EDGE; + pd.name = prop; + props.emplace_back(std::move(pd)); + } + + return props; +} + + +std::vector GoExecutor::getDstProps() const { + std::vector props; + for (auto &tagProp : expCtx_->dstTagProps()) { + storage::cpp2::PropDef pd; + pd.owner = storage::cpp2::PropOwner::DEST; + pd.name = tagProp.second; + auto tagId = meta::SchemaManager::toTagID(tagProp.first); + pd.set_tag_id(tagId); + props.emplace_back(std::move(pd)); + } + return props; +} + + +void GoExecutor::fetchVertexProps(std::vector ids, RpcResponse &&rpcResp) { + auto space = ectx()->rctx()->session()->space(); + auto returns = getDstProps(); + auto future = ectx()->storage()->getVertexProps(space, ids, returns); + auto *runner = ectx()->rctx()->runner(); + auto cb = [this, stepOutResp = std::move(rpcResp)] (auto &&result) mutable { + auto completeness = result.completeness(); + if (completeness == 0) { + DCHECK(onError_); + onError_(Status::Error("Get dest props failed")); + return; + } else if (completeness != 100) { + LOG(INFO) << "Get neighbors partially failed: " << completeness << "%"; + for (auto &error : result.failedParts()) { + LOG(ERROR) << "part: " << error.first + << "error code: " << static_cast(error.second); + } + } + if (vertexHolder_ == nullptr) { + vertexHolder_ = std::make_unique(); + } + for (auto &resp : result.responses()) { + vertexHolder_->add(resp); + } + finishExecution(std::move(stepOutResp)); + return; + }; + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + onError_(Status::Error("Internal error")); + }; + std::move(future).via(runner).thenValue(cb).thenError(error); +} + + +std::vector GoExecutor::getResultColumnNames() const { + std::vector result; + result.reserve(yields_.size()); + for (auto *col : yields_) { + if (col->alias() == nullptr) { + result.emplace_back(col->expr()->toString()); + } else { + result.emplace_back(*col->alias()); + } + } + return result; +} + + +std::unique_ptr GoExecutor::setupInterimResult(RpcResponse &&rpcResp) { + // Generic results + std::shared_ptr schema; + std::unique_ptr rsWriter; + using nebula::cpp2::SupportedType; + auto cb = [&] (std::vector record) { + if (schema == nullptr) { + schema = std::make_shared(); + auto colnames = getResultColumnNames(); + for (auto i = 0u; i < record.size(); i++) { + SupportedType type; + switch (record[i].which()) { + case 0: + // all integers in InterimResult are regarded as type of VID + type = SupportedType::VID; + break; + case 1: + type = SupportedType::DOUBLE; + break; + case 2: + type = SupportedType::BOOL; + break; + case 3: + type = SupportedType::STRING; + break; + default: + LOG(FATAL) << "Unknown VariantType: " << record[i].which(); + } + schema->appendCol(colnames[i], type); + } // for + rsWriter = std::make_unique(schema); + } // if + + RowWriter writer(schema); + for (auto &column : record) { + switch (column.which()) { + case 0: + writer << boost::get(column); + break; + case 1: + writer << boost::get(column); + break; + case 2: + writer << boost::get(column); + break; + case 3: + writer << boost::get(column); + break; + default: + LOG(FATAL) << "Unknown VariantType: " << column.which(); + } + } + rsWriter->addRow(writer); + }; // cb + processFinalResult(rpcResp, cb); + return std::make_unique(std::move(rsWriter)); +} + + +void GoExecutor::setupResponseHeader(cpp2::ExecutionResponse &resp) const { + resp.set_column_names(getResultColumnNames()); +} + + +VariantType getProp(const std::string &prop, + const RowReader *reader, + ResultSchemaProvider *schema) { + using nebula::cpp2::SupportedType; + auto type = schema->getFieldType(prop).type; + switch (type) { + case SupportedType::BOOL: { + bool v; + reader->getBool(prop, v); + return v; + } + case SupportedType::INT: { + int64_t v; + reader->getInt(prop, v); + return v; + } + case SupportedType::VID: { + VertexID v; + reader->getVid(prop, v); + return v; + } + case SupportedType::FLOAT: { + float v; + reader->getFloat(prop, v); + return static_cast(v); + } + case SupportedType::DOUBLE: { + double v; + reader->getDouble(prop, v); + return v; + } + case SupportedType::STRING: { + folly::StringPiece v; + reader->getString(prop, v); + return v.toString(); + } + default: + LOG(FATAL) << "Unknown type: " << static_cast(type); + return ""; + } +} + + +void GoExecutor::setupResponseBody(RpcResponse &rpcResp, cpp2::ExecutionResponse &resp) const { + std::vector rows; + auto cb = [&] (std::vector record) { + std::vector row; + row.reserve(record.size()); + for (auto &column : record) { + row.emplace_back(); + switch (column.which()) { + case 0: + row.back().set_integer(boost::get(column)); + break; + case 1: + row.back().set_double_precision(boost::get(column)); + break; + case 2: + row.back().set_bool_val(boost::get(column)); + break; + case 3: + row.back().set_str(boost::get(column)); + break; + default: + LOG(FATAL) << "Unknown VariantType: " << column.which(); + } + } + rows.emplace_back(); + rows.back().set_columns(std::move(row)); + }; + processFinalResult(rpcResp, cb); + resp.set_rows(std::move(rows)); +} + + +void GoExecutor::onEmptyInputs() { + if (onResult_) { + onResult_(nullptr); + } else if (resp_ == nullptr) { + resp_ = std::make_unique(); + } + onFinish_(); +} + + +void GoExecutor::processFinalResult(RpcResponse &rpcResp, Callback cb) const { + auto all = rpcResp.responses(); + for (auto &resp : all) { + if (resp.get_vertices() == nullptr) { + continue; + } + std::shared_ptr vschema; + std::shared_ptr eschema; + if (resp.get_vertex_schema() != nullptr) { + vschema = std::make_shared(resp.vertex_schema); + } + if (resp.get_edge_schema() != nullptr) { + eschema = std::make_shared(resp.edge_schema); + } + + for (auto &vdata : resp.vertices) { + std::unique_ptr vreader; + if (vschema != nullptr) { + DCHECK(vdata.__isset.vertex_data); + vreader = RowReader::getRowReader(vdata.vertex_data, vschema); + } + DCHECK(vdata.__isset.edge_data); + DCHECK(eschema != nullptr); + RowSetReader rsReader(eschema, vdata.edge_data); + auto iter = rsReader.begin(); + while (iter) { + auto &getters = expCtx_->getters(); + getters.getEdgeProp = [&] (const std::string &prop) -> VariantType { + return getProp(prop, &*iter, eschema.get()); + }; + getters.getSrcTagProp = [&] (const std::string&, const std::string &prop) { + return getProp(prop, vreader.get(), vschema.get()); + }; + getters.getDstTagProp = [&] (const std::string&, const std::string &prop) { + auto dst = getProp("_dst", &*iter, eschema.get()); + return vertexHolder_->get(boost::get(dst), prop); + }; + // Evaluate filter + if (filter_ != nullptr) { + auto value = filter_->eval(); + if (!Expression::asBool(value)) { + ++iter; + continue; + } + } + std::vector record; + record.reserve(yields_.size()); + for (auto *column : yields_) { + auto *expr = column->expr(); + // TODO(dutor) `eval' may fail + auto value = expr->eval(); + record.emplace_back(std::move(value)); + } + cb(std::move(record)); + ++iter; + } // while `iter' + } // for `vdata' + } // for `resp' +} + + +VariantType GoExecutor::VertexHolder::get(VertexID id, const std::string &prop) const { + DCHECK(schema_ != nullptr); + auto iter = data_.find(id); + + // TODO(dutor) We need a type to represent NULL or non-existing prop + CHECK(iter != data_.end()); + + auto reader = RowReader::getRowReader(iter->second, schema_); + + return getProp(prop, reader.get(), schema_.get()); +} + + +void GoExecutor::VertexHolder::add(const storage::cpp2::QueryResponse &resp) { + auto *vertices = resp.get_vertices(); + if (vertices == nullptr) { + return; + } + if (resp.get_vertex_schema() == nullptr) { + return; + } + if (schema_ == nullptr) { + schema_ = std::make_shared(resp.vertex_schema); + } + + for (auto &vdata : *vertices) { + DCHECK(vdata.__isset.vertex_data); + data_[vdata.vertex_id] = vdata.vertex_data; + } } } // namespace graph diff --git a/src/executor/GoExecutor.h b/src/executor/GoExecutor.h index acdc78b1a3c..494965d287b 100644 --- a/src/executor/GoExecutor.h +++ b/src/executor/GoExecutor.h @@ -9,8 +9,16 @@ #include "base/Base.h" #include "graph/TraverseExecutor.h" +#include "storage/client/StorageClient.h" namespace nebula { + +namespace storage { +namespace cpp2 { +class QueryResponse; +} // namespace cpp2 +} // namespace storage + namespace graph { class GoExecutor final : public TraverseExecutor { @@ -25,13 +33,14 @@ class GoExecutor final : public TraverseExecutor { void execute() override; - void feedResult(TraverseRecords records) override; + void feedResult(std::unique_ptr result) override; void setupResponse(cpp2::ExecutionResponse &resp) override; private: - Status prepareResultSchema(); - + /** + * To do some preparing works on the clauses + */ Status prepareStep(); Status prepareFrom(); @@ -44,19 +53,129 @@ class GoExecutor final : public TraverseExecutor { Status prepareNeededProps(); - void cacheResult(TraverseRecords records); + /** + * To check if this is the final step. + */ + bool isFinalStep() const { + return curStep_ == steps_; + } + + /** + * To check if `UPTO' is specified. + * If so, we are supposed to apply the filter in each step. + */ + bool isUpto() const { + return upto_; + } + + /** + * To check if `REVERSELY' is specified. + * If so, we step out in a reverse manner. + */ + bool isReversely() const { + return reversely_; + } + + /** + * To obtain the source ids from various places, + * such as the literal id list, inputs from the pipeline or results of variable. + */ + Status setupStarts(); + + /** + * To step out for one step. + */ + void stepOut(); + + using RpcResponse = storage::StorageRpcResponse; + /** + * Callback invoked upon the response of stepping out arrives. + */ + void onStepOutResponse(RpcResponse &&rpcResp); + + /** + * Callback invoked when the stepping out action reaches the dead end. + */ + void onEmptyInputs(); + + /** + * Callback invoked upon the response of retrieving terminal vertex arrives. + */ + void onVertexProps(RpcResponse &&rpcResp); + + std::vector getStepOutProps() const; + std::vector getDstProps() const; + + void fetchVertexProps(std::vector ids, RpcResponse &&rpcResp); + + /** + * To retrieve or generate the column names for the execution result. + */ + std::vector getResultColumnNames() const; + + /** + * To retrieve the dst ids from a stepping out response. + */ + std::vector getDstIdsFromResp(RpcResponse &rpcResp) const; + + /** + * All required data have arrived, finish the execution. + */ + void finishExecution(RpcResponse &&rpcResp); + + /** + * To setup an intermediate representation of the execution result, + * which is about to be piped to the next executor. + */ + std::unique_ptr setupInterimResult(RpcResponse &&rpcResp); + + /** + * To setup the header of the execution result, i.e. the column names. + */ + void setupResponseHeader(cpp2::ExecutionResponse &resp) const; + + /** + * To setup the body of the execution result. + */ + void setupResponseBody(RpcResponse &rpcResp, cpp2::ExecutionResponse &resp) const; + + /** + * To iterate on the final data collection, and evaluate the filter and yield columns. + * For each row that matches the filter, `cb' would be invoked. + */ + using Callback = std::function)>; + void processFinalResult(RpcResponse &rpcResp, Callback cb) const; + + /** + * A container to hold the mapping from vertex id to its properties, used for lookups + * during the final evaluation process. + */ + class VertexHolder final { + public: + VariantType get(VertexID id, const std::string &prop) const; + void add(const storage::cpp2::QueryResponse &resp); + + private: + std::shared_ptr schema_; + std::unordered_map data_; + }; private: GoSentence *sentence_{nullptr}; uint32_t steps_{1}; + uint32_t curStep_{1}; bool upto_{false}; bool reversely_{false}; - std::string *edge_; + EdgeType edge_; + std::string *varname_{nullptr}; + std::string *colname_{nullptr}; Expression *filter_{nullptr}; std::vector yields_; - TraverseRecords inputs_; - std::unique_ptr expctx_; - std::vector starts_; + std::unique_ptr inputs_; + std::unique_ptr expCtx_; + std::vector starts_; + std::unique_ptr vertexHolder_; + std::unique_ptr resp_; }; } // namespace graph diff --git a/src/executor/GraphService.cpp b/src/executor/GraphService.cpp index b06e8a26114..94dd9093a63 100644 --- a/src/executor/GraphService.cpp +++ b/src/executor/GraphService.cpp @@ -9,19 +9,23 @@ #include "time/Duration.h" #include "graph/RequestContext.h" #include "graph/SimpleAuthenticator.h" +#include "storage/client/StorageClient.h" namespace nebula { namespace graph { -GraphService::GraphService() { +GraphService::GraphService(std::shared_ptr ioExecutor) { sessionManager_ = std::make_unique(); - executionEngine_ = std::make_unique(); + auto storage = std::make_unique(ioExecutor); + executionEngine_ = std::make_unique(std::move(storage)); authenticator_ = std::make_unique(); } + GraphService::~GraphService() { } + folly::Future GraphService::future_authenticate( const std::string& username, const std::string& password) { diff --git a/src/executor/GraphService.h b/src/executor/GraphService.h index 13c12611ce4..8e549a93912 100644 --- a/src/executor/GraphService.h +++ b/src/executor/GraphService.h @@ -13,12 +13,16 @@ #include "graph/ExecutionEngine.h" #include "graph/Authenticator.h" +namespace folly { +class IOThreadPoolExecutor; +} // namespace folly + namespace nebula { namespace graph { class GraphService final : public cpp2::GraphServiceSvIf { public: - GraphService(); + explicit GraphService(std::shared_ptr ioExecutor); ~GraphService(); folly::Future future_authenticate( const std::string& username, @@ -26,9 +30,8 @@ class GraphService final : public cpp2::GraphServiceSvIf { void signout(int64_t /*sessionId*/) override; - folly::Future future_execute( - int64_t sessionId, - const std::string& stmt) override; + folly::Future + future_execute(int64_t sessionId, const std::string& stmt) override; const char* getErrorStr(cpp2::ErrorCode result); diff --git a/src/executor/InsertEdgeExecutor.cpp b/src/executor/InsertEdgeExecutor.cpp index 7330385951b..1c3ca92337f 100644 --- a/src/executor/InsertEdgeExecutor.cpp +++ b/src/executor/InsertEdgeExecutor.cpp @@ -6,6 +6,9 @@ #include "base/Base.h" #include "graph/InsertEdgeExecutor.h" +#include "meta/SchemaManager.h" +#include "storage/client/StorageClient.h" +#include "dataman/RowWriter.h" namespace nebula { namespace graph { @@ -18,37 +21,97 @@ InsertEdgeExecutor::InsertEdgeExecutor(Sentence *sentence, Status InsertEdgeExecutor::prepare() { overwritable_ = sentence_->overwritable(); - srcid_ = sentence_->srcid(); - dstid_ = sentence_->dstid(); - edge_ = sentence_->edge(); - rank_ = sentence_->rank(); + edge_ = meta::SchemaManager::toEdgeType(*sentence_->edge()); properties_ = sentence_->properties(); - values_ = sentence_->values(); - // TODO(dutor) check on property names and types - if (properties_.size() != values_.size()) { - return Status::Error("Number of property names and values not match"); + rows_ = sentence_->rows(); + auto space = ectx()->rctx()->session()->space(); + schema_ = meta::SchemaManager::getEdgeSchema(space, edge_); + if (schema_ == nullptr) { + return Status::Error("No schema found for `%s'", sentence_->edge()->c_str()); } return Status::OK(); } void InsertEdgeExecutor::execute() { - std::vector values; - values.resize(values_.size()); - auto eval = [] (auto *expr) { return expr->eval(); }; - std::transform(values_.begin(), values_.end(), values.begin(), eval); + std::vector edges(rows_.size() * 2); // inbound and outbound + auto index = 0; + for (auto i = 0u; i < rows_.size(); i++) { + auto *row = rows_[i]; + auto src = row->srcid(); + auto dst = row->dstid(); + auto rank = row->rank(); + auto exprs = row->values(); + std::vector values; + values.resize(exprs.size()); + auto eval = [] (auto *expr) { return expr->eval(); }; + std::transform(exprs.begin(), exprs.end(), values.begin(), eval); - auto future = ectx()->storage()->addEdge(edge_, srcid_, dstid_, properties_, values); + RowWriter writer(schema_); + for (auto &value : values) { + switch (value.which()) { + case 0: + writer << boost::get(value); + break; + case 1: + writer << boost::get(value); + break; + case 2: + writer << boost::get(value); + break; + case 3: + writer << boost::get(value); + break; + default: + LOG(FATAL) << "Unknown value type: " << static_cast(value.which()); + } + } + { + auto &out = edges[index++]; + out.key.set_src(src); + out.key.set_dst(dst); + out.key.set_ranking(rank); + out.key.set_edge_type(edge_); + out.props = writer.encode(); + out.__isset.key = true; + out.__isset.props = true; + } + { + auto &in = edges[index++]; + in.key.set_src(dst); + in.key.set_dst(src); + in.key.set_ranking(rank); + in.key.set_edge_type(-edge_); + in.props = ""; + in.__isset.key = true; + in.__isset.props = true; + } + } + + auto space = ectx()->rctx()->session()->space(); + auto future = ectx()->storage()->addEdges(space, std::move(edges), overwritable_); auto *runner = ectx()->rctx()->runner(); - std::move(future).via(runner).then([this] (Status status) { - if (!status.ok()) { - auto &resp = ectx()->rctx()->resp(); - resp.set_error_code(cpp2::ErrorCode::E_EXECUTION_ERROR); - resp.set_error_msg(status.toString()); + + auto cb = [this] (auto &&resp) { + // For insertion, we regard partial success as failure. + auto completeness = resp.completeness(); + if (completeness != 100) { + DCHECK(onError_); + onError_(Status::Error("Internal Error")); + return; } DCHECK(onFinish_); onFinish_(); - }); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); } } // namespace graph diff --git a/src/executor/InsertEdgeExecutor.h b/src/executor/InsertEdgeExecutor.h index 95498b928bc..954203856f8 100644 --- a/src/executor/InsertEdgeExecutor.h +++ b/src/executor/InsertEdgeExecutor.h @@ -9,6 +9,7 @@ #include "base/Base.h" #include "graph/Executor.h" +#include "meta/SchemaManager.h" namespace nebula { namespace graph { @@ -26,14 +27,13 @@ class InsertEdgeExecutor final : public Executor { void execute() override; private: + using EdgeSchema = std::shared_ptr; InsertEdgeSentence *sentence_{nullptr}; bool overwritable_{true}; - int64_t srcid_{0}; - int64_t dstid_{0}; - int64_t rank_{0}; - std::string *edge_{nullptr}; + EdgeType edge_{0}; + EdgeSchema schema_; std::vector properties_; - std::vector values_; + std::vector rows_; }; } // namespace graph diff --git a/src/executor/InsertVertexExecutor.cpp b/src/executor/InsertVertexExecutor.cpp index 3542b036310..595f6ac9db6 100644 --- a/src/executor/InsertVertexExecutor.cpp +++ b/src/executor/InsertVertexExecutor.cpp @@ -6,6 +6,8 @@ #include "base/Base.h" #include "graph/InsertVertexExecutor.h" +#include "storage/client/StorageClient.h" +#include "dataman/RowWriter.h" namespace nebula { namespace graph { @@ -18,32 +20,91 @@ InsertVertexExecutor::InsertVertexExecutor(Sentence *sentence, Status InsertVertexExecutor::prepare() { overwritable_ = sentence_->overwritable(); - id_ = sentence_->id(); vertex_ = sentence_->vertex(); + tagId_ = meta::SchemaManager::toTagID(*vertex_); properties_ = sentence_->properties(); - values_ = sentence_->values(); + rows_ = sentence_->rows(); // TODO(dutor) check on property names and types - if (properties_.size() != values_.size()) { - return Status::Error("Number of property names and values not match"); + if (rows_.empty()) { + return Status::Error("VALUES cannot be empty"); + } + auto space = ectx()->rctx()->session()->space(); + schema_ = meta::SchemaManager::getTagSchema(space, tagId_); + if (schema_ == nullptr) { + return Status::Error("No schema found for `%s'", vertex_->c_str()); } return Status::OK(); } void InsertVertexExecutor::execute() { - std::vector values; - values.resize(values_.size()); - auto eval = [] (auto *expr) { return expr->eval(); }; - std::transform(values_.begin(), values_.end(), values.begin(), eval); + using storage::cpp2::Vertex; + using storage::cpp2::Tag; + + std::vector vertices(rows_.size()); + for (auto i = 0u; i < rows_.size(); i++) { + auto *row = rows_[i]; + auto id = row->id(); + auto exprs = row->values(); + std::vector values; + values.resize(exprs.size()); + auto eval = [] (auto *expr) { return expr->eval(); }; + std::transform(exprs.begin(), exprs.end(), values.begin(), eval); + + auto &vertex = vertices[i]; + std::vector tags(1); + auto &tag = tags[0]; + RowWriter writer(schema_); - auto future = ectx()->storage()->addTag(vertex_, id_, properties_, values); + for (auto &value : values) { + switch (value.which()) { + case 0: + writer << boost::get(value); + break; + case 1: + writer << boost::get(value); + break; + case 2: + writer << boost::get(value); + break; + case 3: + writer << boost::get(value); + break; + default: + LOG(FATAL) << "Unknown value type: " << static_cast(value.which()); + } + } + + tag.set_tag_id(tagId_); + tag.set_props(writer.encode()); + vertex.set_id(id); + vertex.set_tags(std::move(tags)); + } + + auto space = ectx()->rctx()->session()->space(); + auto future = ectx()->storage()->addVertices(space, std::move(vertices), overwritable_); auto *runner = ectx()->rctx()->runner(); - std::move(future).via(runner).then([this] (Status status) { - // TODO(dutor) error handling - UNUSED(status); + + auto cb = [this] (auto &&resp) { + // For insertion, we regard partial success as failure. + auto completeness = resp.completeness(); + if (completeness != 100) { + DCHECK(onError_); + onError_(Status::Error("Internal Error")); + return; + } DCHECK(onFinish_); onFinish_(); - }); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); } } // namespace graph diff --git a/src/executor/InsertVertexExecutor.h b/src/executor/InsertVertexExecutor.h index 850d8915d3f..a0d3b4f8cee 100644 --- a/src/executor/InsertVertexExecutor.h +++ b/src/executor/InsertVertexExecutor.h @@ -9,6 +9,7 @@ #include "base/Base.h" #include "graph/Executor.h" +#include "meta/SchemaManager.h" namespace nebula { namespace graph { @@ -26,12 +27,14 @@ class InsertVertexExecutor final : public Executor { void execute() override; private: + using TagSchema = std::shared_ptr; InsertVertexSentence *sentence_{nullptr}; bool overwritable_{true}; - int64_t id_{0}; + TagID tagId_{0}; std::string *vertex_{nullptr}; + TagSchema schema_; std::vector properties_; - std::vector values_; + std::vector rows_; }; } // namespace graph diff --git a/src/executor/InterimResult.cpp b/src/executor/InterimResult.cpp new file mode 100644 index 00000000000..9adb5f3a258 --- /dev/null +++ b/src/executor/InterimResult.cpp @@ -0,0 +1,45 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include "graph/InterimResult.h" +#include "dataman/RowReader.h" + +namespace nebula { +namespace graph { + +InterimResult::InterimResult(std::unique_ptr rsWriter) { + rsWriter_ = std::move(rsWriter); + rsReader_ = std::make_unique(rsWriter_->schema(), rsWriter_->data()); +} + + +InterimResult::InterimResult(std::vector vids) { + vids_ = std::move(vids); +} + + +std::vector InterimResult::getVIDs(const std::string &col) const { + if (!vids_.empty()) { + DCHECK(rsReader_ == nullptr); + return vids_; + } + DCHECK(rsReader_ != nullptr); + std::vector result; + auto iter = rsReader_->begin(); + while (iter) { + VertexID vid; + auto rc = iter->getVid(col, vid); + CHECK(rc == ResultType::SUCCEEDED); + result.push_back(vid); + ++iter; + } + return result; +} + + +} // namespace graph +} // namespace nebula diff --git a/src/executor/InterimResult.h b/src/executor/InterimResult.h new file mode 100644 index 00000000000..b015d496f48 --- /dev/null +++ b/src/executor/InterimResult.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef GRAPH_INTERIMRESULT_H_ +#define GRAPH_INTERIMRESULT_H_ + +#include "base/Base.h" +#include "dataman/RowSetReader.h" +#include "dataman/RowSetWriter.h" +#include "dataman/SchemaWriter.h" + +/** + * The intermediate form of execution result, used in pipeline and variable. + */ + + +namespace nebula { +namespace graph { + +class InterimResult final { +public: + InterimResult() = default; + ~InterimResult() = default; + InterimResult(const InterimResult &) = default; + InterimResult& operator=(const InterimResult &) = default; + InterimResult(InterimResult &&) = default; + InterimResult& operator=(InterimResult &&) = default; + + explicit InterimResult(std::unique_ptr rsWriter); + explicit InterimResult(std::vector vids); + + std::vector getVIDs(const std::string &col) const; + + // TODO(dutor) iterating interfaces on rows and columns + +private: + std::unique_ptr rsReader_; + std::unique_ptr rsWriter_; + std::vector vids_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_INTERIMRESULT_H_ diff --git a/src/executor/PipeExecutor.cpp b/src/executor/PipeExecutor.cpp index 25cfa16c2d4..da8ad8256e7 100644 --- a/src/executor/PipeExecutor.cpp +++ b/src/executor/PipeExecutor.cpp @@ -56,9 +56,9 @@ Status PipeExecutor::prepare() { }; left_->setOnFinish(onFinish); - auto onResult = [this] (TraverseRecords records) { + auto onResult = [this] (std::unique_ptr result) { // Feed results from `left_' to `right_' - right_->feedResult(std::move(records)); + right_->feedResult(std::move(result)); }; left_->setOnResult(onResult); @@ -73,9 +73,9 @@ Status PipeExecutor::prepare() { right_->setOnFinish(onFinish); if (onResult_) { - auto onResult = [this] (TraverseRecords records) { + auto onResult = [this] (std::unique_ptr result) { // This executor takes results of `right_' as results. - onResult_(std::move(records)); + onResult_(std::move(result)); }; right_->setOnResult(onResult); } else { @@ -94,8 +94,8 @@ void PipeExecutor::execute() { } -void PipeExecutor::feedResult(TraverseRecords records) { - left_->feedResult(std::move(records)); +void PipeExecutor::feedResult(std::unique_ptr result) { + left_->feedResult(std::move(result)); } diff --git a/src/executor/PipeExecutor.h b/src/executor/PipeExecutor.h index b42fe71ab07..ab45355d95a 100644 --- a/src/executor/PipeExecutor.h +++ b/src/executor/PipeExecutor.h @@ -25,7 +25,7 @@ class PipeExecutor final : public TraverseExecutor { void execute() override; - void feedResult(TraverseRecords records) override; + void feedResult(std::unique_ptr result) override; void setupResponse(cpp2::ExecutionResponse &resp) override; diff --git a/src/executor/TraverseExecutor.cpp b/src/executor/TraverseExecutor.cpp index cf4e84c7719..c64fed739a7 100644 --- a/src/executor/TraverseExecutor.cpp +++ b/src/executor/TraverseExecutor.cpp @@ -14,14 +14,21 @@ namespace nebula { namespace graph { std::unique_ptr TraverseExecutor::makeTraverseExecutor(Sentence *sentence) { + return makeTraverseExecutor(sentence, ectx()); +} + + +// static +std::unique_ptr +TraverseExecutor::makeTraverseExecutor(Sentence *sentence, ExecutionContext *ectx) { auto kind = sentence->kind(); std::unique_ptr executor; switch (kind) { case Sentence::Kind::kGo: - executor = std::make_unique(sentence, ectx()); + executor = std::make_unique(sentence, ectx); break; case Sentence::Kind::kPipe: - executor = std::make_unique(sentence, ectx()); + executor = std::make_unique(sentence, ectx); break; case Sentence::Kind::kUnknown: LOG(FATAL) << "Sentence kind unknown"; diff --git a/src/executor/TraverseExecutor.h b/src/executor/TraverseExecutor.h index 0d13647f5f3..3f3dfa9135d 100644 --- a/src/executor/TraverseExecutor.h +++ b/src/executor/TraverseExecutor.h @@ -9,6 +9,7 @@ #include "base/Base.h" #include "graph/Executor.h" +#include "graph/InterimResult.h" namespace nebula { namespace graph { @@ -51,11 +52,9 @@ class TraverseExecutor : public Executor { public: explicit TraverseExecutor(ExecutionContext *ectx) : Executor(ectx) {} - using TraverseRecord = std::vector; - using TraverseRecords = std::vector; - using OnResult = std::function; + using OnResult = std::function)>; - virtual void feedResult(TraverseRecords records) = 0; + virtual void feedResult(std::unique_ptr result) = 0; /** * `onResult_' must be set except for the right most executor @@ -79,6 +78,9 @@ class TraverseExecutor : public Executor { inputResultSchema_ = schema; } + static std::unique_ptr + makeTraverseExecutor(Sentence *sentence, ExecutionContext *ectx); + protected: std::unique_ptr makeTraverseExecutor(Sentence *sentence); diff --git a/src/executor/UseExecutor.cpp b/src/executor/UseExecutor.cpp index 4dc41a24833..e2bcee2a2bb 100644 --- a/src/executor/UseExecutor.cpp +++ b/src/executor/UseExecutor.cpp @@ -6,6 +6,7 @@ #include "base/Base.h" #include "graph/UseExecutor.h" +#include "meta/SchemaManager.h" namespace nebula { namespace graph { @@ -24,8 +25,9 @@ void UseExecutor::execute() { auto *session = ectx()->rctx()->session(); // TODO(dutor) Check space's validness and map to type of integer - session->setSpace(sentence_->space()); - FLOG_INFO("Graph space switched to `%s'", sentence_->space().c_str()); + auto space = meta::SchemaManager::toGraphSpaceID(*sentence_->space()); + session->setSpace(*sentence_->space(), space); + FLOG_INFO("Graph space switched to `%s', space id: %d", sentence_->space()->c_str(), space); onFinish_(); } diff --git a/src/executor/VariableHolder.cpp b/src/executor/VariableHolder.cpp new file mode 100644 index 00000000000..3ea87518169 --- /dev/null +++ b/src/executor/VariableHolder.cpp @@ -0,0 +1,50 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include "graph/VariableHolder.h" +#include "graph/InterimResult.h" + +namespace nebula { +namespace graph { + +VariableHolder::VariableHolder() { +} + + +VariableHolder::~VariableHolder() { +} + + +VariableHolder::VariableHolder(VariableHolder &&rhs) noexcept { + holder_ = std::move(rhs.holder_); +} + + +VariableHolder& VariableHolder::operator=(VariableHolder &&rhs) noexcept { + if (this == &rhs) { + return *this; + } + holder_ = std::move(rhs.holder_); + return *this; +} + + +void VariableHolder::add(const std::string &var, std::unique_ptr result) { + holder_[var] = std::move(result); +} + + +const InterimResult* VariableHolder::get(const std::string &var) const { + auto iter = holder_.find(var); + if (iter == holder_.end()) { + return nullptr; + } + return iter->second.get(); +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/VariableHolder.h b/src/executor/VariableHolder.h new file mode 100644 index 00000000000..c47de4b4c1b --- /dev/null +++ b/src/executor/VariableHolder.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef GRAPH_VARIABLEHOLDER_H_ +#define GRAPH_VARIABLEHOLDER_H_ + +#include "base/Base.h" + +namespace nebula { +namespace graph { + +class InterimResult; +class VariableHolder final { +public: + VariableHolder(); + ~VariableHolder(); + VariableHolder(const VariableHolder&) = delete; + VariableHolder& operator=(const VariableHolder&) = delete; + VariableHolder(VariableHolder &&) noexcept; + VariableHolder& operator=(VariableHolder &&) noexcept; + + void add(const std::string &var, std::unique_ptr result); + + const InterimResult* get(const std::string &var) const; + +private: + std::unordered_map> holder_; +}; + +} // namespace graph +} // namespace nebula + + +#endif // GRAPH_VARIABLEHOLDER_H_ diff --git a/src/executor/test/CMakeLists.txt b/src/executor/test/CMakeLists.txt index 4b3d7e10342..3386e7c855b 100644 --- a/src/executor/test/CMakeLists.txt +++ b/src/executor/test/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -16,6 +17,9 @@ add_executable( $ $ $ + $ + $ + $ ) nebula_link_libraries( session_manager_test @@ -43,6 +47,7 @@ add_executable( $ $ $ + $ $ $ $ @@ -50,12 +55,47 @@ add_executable( $ $ $ + $ + $ + $ ) nebula_link_libraries( query_engine_test ${THRIFT_LIBRARIES} wangle gtest - gtest_main ) nebula_add_test(query_engine_test) + + +add_executable( + go_test + TestMain.cpp + TestEnv.cpp + TestBase.cpp + GoTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +nebula_link_libraries( + go_test + ${THRIFT_LIBRARIES} + wangle + gtest +) +nebula_add_test(go_test) diff --git a/src/executor/test/GoTest.cpp b/src/executor/test/GoTest.cpp new file mode 100644 index 00000000000..0f981816b98 --- /dev/null +++ b/src/executor/test/GoTest.cpp @@ -0,0 +1,828 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include "graph/test/TestEnv.h" +#include "graph/test/TestBase.h" +#include "fs/TempFile.h" + +DECLARE_string(schema_file); + +namespace nebula { +namespace graph { + +class GoTest : public TestBase { +protected: + void SetUp() override { + TestBase::SetUp(); + // ... + } + + void TearDown() override { + // ... + TestBase::TearDown(); + } + + static void SetUpTestCase() { + client_ = gEnv->getClient(); + ASSERT_NE(nullptr, client_); + + ASSERT_TRUE(prepareSchema()); + + ASSERT_TRUE(prepareData()); + } + + static void TearDownTestCase() { + client_.reset(); + } + + static AssertionResult prepareSchema(); + + static AssertionResult prepareData(); + + class Player final { + public: + Player(std::string name, int64_t age) { + name_ = std::move(name); + age_ = age; + vid_ = std::hash()(name_); + } + + Player(const Player&) = default; + Player(Player&&) = default; + Player& operator=(const Player&) = default; + Player& operator=(Player&&) = default; + + const std::string& name() const { + return name_; + } + + int64_t vid() const { + return vid_; + } + + int64_t age() const { + return age_; + } + + Player& serve(std::string team, int64_t startYear, int64_t endYear) { + serves_.emplace_back(std::move(team), startYear, endYear); + return *this; + } + + Player& like(std::string player, int64_t likeness) { + likes_.emplace_back(std::move(player), likeness); + return *this; + } + + const auto& serves() const { + return serves_; + } + + const auto& likes() const { + return likes_; + } + + private: + using Serve = std::tuple; + using Like = std::tuple; + std::string name_; + int64_t vid_{0}; + int64_t age_{0}; + int64_t draftYear{0}; + std::vector serves_; + std::vector likes_; + }; + + template + class VertexHolder final { + public: + using KeyGetter = std::function; + using Container = std::unordered_map; + using InternalIterator = typename Container::iterator; + VertexHolder(KeyGetter getter, std::initializer_list vertices) { + getter_ = std::move(getter); + for (auto &vertex : vertices) { + vertices_.emplace(getter_(vertex), std::move(vertex)); + } + } + + const Vertex& operator[](const Key &key) const { + auto iter = vertices_.find(key); + CHECK(iter != vertices_.end()) << "Vertex not exist, key: " << key;; + return iter->second; + } + + Vertex& operator[](const Key &key) { + auto iter = vertices_.find(key); + CHECK(iter != vertices_.end()) << "Vertex not exist, key: " << key; + return iter->second; + } + + class Iterator final { + public: + explicit Iterator(InternalIterator iter) { + iter_ = iter; + } + + Iterator& operator++() { + ++iter_; + return *this; + } + + Vertex& operator*() { + return iter_->second; + } + + Vertex* operator->() { + return &iter_->second; + } + + bool operator==(const Iterator &rhs) const { + return iter_ == rhs.iter_; + } + + bool operator!=(const Iterator &rhs) const { + return !(*this == rhs); + } + + private: + InternalIterator iter_; + }; + + Iterator begin() { + return Iterator(vertices_.begin()); + } + + Iterator end() { + return Iterator(vertices_.end()); + } + + private: + KeyGetter getter_; + Container vertices_; + }; + + + class Team final { + public: + explicit Team(std::string name) { + name_ = std::move(name); + vid_ = std::hash()(name_); + } + + const std::string& name() const { + return name_; + } + + int64_t vid() const { + return vid_; + } + + private: + std::string name_; + int64_t vid_{0}; + }; + +protected: + static std::unique_ptr schemaFile_; + static std::unique_ptr client_; + static VertexHolder players_; + static VertexHolder teams_; +}; + +std::unique_ptr GoTest::schemaFile_; + +std::unique_ptr GoTest::client_; + +GoTest::VertexHolder GoTest::players_ = { + [] (const auto &player) { return player.name(); }, { + Player{"Tim Duncan", 42/*, 1997*/}, + Player{"Tony Parker", 36}, + Player{"LaMarcus Aldridge", 33}, + Player{"Rudy Gay", 32}, + Player{"Marco Belinelli", 32}, + Player{"Danny Green", 31}, + Player{"Kyle Anderson", 25}, + Player{"Aron Baynes", 32}, + Player{"Boris Diaw", 36}, + Player{"Tiago Splitter", 34}, + Player{"Cory Joseph", 27}, + Player{"David West", 38}, + Player{"Jonathon Simmons", 29}, + Player{"Dejounte Murray", 29}, + Player{"Tracy McGrady", 39}, + Player{"Kobe Bryant", 40}, + Player{"LeBron James", 34}, + Player{"Stephen Curry", 31}, + Player{"Russell Westbrook", 30}, + Player{"Kevin Durant", 30}, + Player{"James Harden", 29}, + Player{"Chris Paul", 33}, + Player{"DeAndre Jordan", 30}, + Player{"Ricky Rubio", 28}, + + Player{"Rajon Rondo", 33}, + Player{"Manu Ginobili", 41}, + Player{"Kyrie Irving", 26}, + Player{"Vince Carter", 42}, + Player{"Carmelo Anthony", 34}, + Player{"Dwyane Wade", 37}, + Player{"Joel Embiid", 25}, + Player{"Paul George", 28}, + Player{"Giannis Antetokounmpo", 24}, + Player{"Yao Ming", 38}, + Player{"Blake Griffin", 30}, + Player{"Damian Lillard", 28}, + Player{"Steve Nash", 45}, + Player{"Dirk Nowitzki", 40}, + Player{"Paul Gasol", 38}, + Player{"Marc Gasol", 34}, + Player{"Grant Hill", 46}, + Player{"Ray Allen", 43}, + Player{"Klay Thompson", 29}, + Player{"Kristaps Porzingis", 23}, + Player{"Shaquile O'Neal", 47}, + Player{"JaVale McGee", 31}, + Player{"Dwight Howard", 33}, + Player{"Amar'e Stoudemire", 36}, + Player{"Jason Kidd", 45}, + Player{"Ben Simmons", 22}, + Player{"Luka Doncic", 20}, + } +}; + +GoTest::VertexHolder GoTest::teams_ = { + [] (const auto &team) { return team.name(); }, { + Team{"Warriors"}, + Team{"Nuggets"}, + Team{"Rockets"}, + Team{"Trail Blazers"}, + Team{"Spurs"}, + Team{"Thunders"}, + Team{"Jazz"}, + Team{"Clippers"}, + Team{"Kings"}, + Team{"Timberwolves"}, + Team{"Lakers"}, + Team{"Pelicans"}, + Team{"Grizzlies"}, + Team{"Mavericks"}, + Team{"Suns"}, + + Team{"Hornets"}, + Team{"Cavaliers"}, + Team{"Celtics"}, + Team{"Raptors"}, + Team{"76ers"}, + Team{"Pacers"}, + Team{"Bulls"}, + Team{"Hawks"}, + Team{"Knicks"}, + Team{"Pistons"}, + Team{"Bucks"}, + Team{"Magic"}, + Team{"Nets"}, + Team{"Wizards"}, + Team{"Heat"}, + } +}; + + +// static +AssertionResult GoTest::prepareSchema() { + schemaFile_ = std::make_unique("/tmp/go_test.XXXXXX"); + std::ofstream file(schemaFile_->path()); + if (!file.good()) { + return TestError() << "Failed to open: " << schemaFile_->path(); + } + file << "{\n" + " \"nba\": {\n" + " \"tags\": {\n" + " \"player\": [\n" + " [\n" + " \"name: string\",\n" + " \"age: integer\"\n" + " ]\n" + " ],\n" + " \"team\": [\n" + " [\n" + " \"name: string\"\n" + " ]\n" + " ]\n" + " },\n" + " \"edges\": {\n" + " \"serve\": [\n" + " [\n" + " \"start_year: integer\",\n" + " \"end_year: integer\"\n" + " ]\n" + " ],\n" + " \"like\": [\n" + " [\n" + " \"likeness: integer\"\n" + " ]\n" + " ]\n" + " }\n" + " }\n" + "}\n"; + file.close(); + FLAGS_schema_file = schemaFile_->path(); + return TestOK(); +} + + +AssertionResult GoTest::prepareData() { + // TODO(dutor) Maybe we should move these data into some kind of testing resources, later. + players_["Tim Duncan"].serve("Spurs", 1997, 2016) + .like("Tony Parker", 95) + .like("Manu Ginobili", 95); + + players_["Tony Parker"].serve("Spurs", 1999, 2018) + .serve("Hornets", 2018, 2019) + .like("Tim Duncan", 95) + .like("Manu Ginobili", 95) + .like("LaMarcus Aldridge", 90); + + players_["Manu Ginobili"].serve("Spurs", 2002, 2018) + .like("Tim Duncan", 90); + + players_["LaMarcus Aldridge"].serve("Trail Blazers", 2006, 2015) + .serve("Spurs", 2015, 2019) + .like("Tony Parker", 75) + .like("Tim Duncan", 75); + + players_["Rudy Gay"].serve("Grizzlies", 2006, 2013) + .serve("Raptors", 2013, 2013) + .serve("Kings", 2013, 2017) + .serve("Spurs", 2017, 2019) + .like("LaMarcus Aldridge", 70); + + players_["Marco Belinelli"].serve("Warriors", 2007, 2009) + .serve("Raptors", 2009, 2010) + .serve("Hornets", 2010, 2012) + .serve("Bulls", 2012, 2013) + .serve("Spurs", 2013, 2015) + .serve("Kings", 2015, 2016) + .serve("Hornets", 2016, 2017) + .serve("Hawks", 2017, 2018) + .serve("76ers", 2018, 2018) + .serve("Spurs", 2018, 2019) + .like("Tony Parker", 50) + .like("Tim Duncan", 55) + .like("Danny Green", 60); + + players_["Danny Green"].serve("Cavaliers", 2009, 2010) + .serve("Spurs", 2010, 2018) + .serve("Raptors", 2018, 2019) + .like("Marco Belinelli", 83) + .like("Tim Duncan", 70) + .like("LeBron James", 80); + + players_["Kyle Anderson"].serve("Spurs", 2014, 2018) + .serve("Grizzlies", 2018, 2019); + + players_["Aron Baynes"].serve("Spurs", 2013, 2015) + .serve("Pistons", 2015, 2017) + .serve("Celtics", 2017, 2019) + .like("Tim Duncan", 80); + + players_["Boris Diaw"].serve("Hawks", 2003, 2005) + .serve("Suns", 2005, 2008) + .serve("Hornets", 2008, 2012) + .serve("Spurs", 2012, 2016) + .serve("Jazz", 2016, 2017) + .like("Tony Parker", 80) + .like("Tim Duncan", 80); + + players_["Tiago Splitter"].serve("Spurs", 2010, 2015) + .serve("Hawks", 2015, 2017) + .serve("76ers", 2017, 2017) + .like("Tim Duncan", 80) + .like("Manu Ginobili", 90); + + players_["Cory Joseph"].serve("Spurs", 2011, 2015) + .serve("Raptors", 2015, 2017) + .serve("Pacers", 2017, 2019); + + players_["David West"].serve("Hornets", 2003, 2011) + .serve("Pacers", 2011, 2015) + .serve("Spurs", 2015, 2016) + .serve("Warriors", 2016, 2018); + + players_["Jonathon Simmons"].serve("Spurs", 2015, 2017) + .serve("Magic", 2017, 2019) + .serve("76ers", 2019, 2019); + + players_["Dejounte Murray"].serve("Spurs", 2016, 2019) + .like("Tim Duncan", 99) + .like("Tony Parker", 99) + .like("Manu Ginobili", 99) + .like("Marco Belinelli", 99) + .like("Danny Green", 99) + .like("LeBron James", 99) + .like("Russell Westbrook", 99) + .like("Chris Paul", 99) + .like("Kyle Anderson", 99) + .like("Kevin Durant", 99) + .like("James Harden", 99) + .like("Tony Parker", 99); + + players_["Tracy McGrady"].serve("Raptors", 1997, 2000) + .serve("Magic", 2000, 2004) + .serve("Rockets", 2004, 2010) + .serve("Spurs", 2013, 2013) + .like("Kobe Bryant", 90) + .like("Grant Hill", 90) + .like("Rudy Gay", 90); + + players_["Kobe Bryant"].serve("Lakers", 1996, 2016); + + players_["LeBron James"].serve("Cavaliers", 2003, 2010) + .serve("Heat", 2010, 2014) + .serve("Cavaliers", 2014, 2018) + .serve("Lakers", 2018, 2019) + .like("Ray Allen", 100); + + players_["Stephen Curry"].serve("Warriors", 2009, 2019); + + players_["Russell Westbrook"].serve("Thunders", 2008, 2019) + .like("Paul George", 90) + .like("James Harden", 90); + + players_["Kevin Durant"].serve("Thunders", 2007, 2016) + .serve("Warriors", 2016, 2019); + + players_["James Harden"].serve("Thunders", 2009, 2012) + .serve("Rockets", 2012, 2019) + .like("Russell Westbrook", 80); + + players_["Chris Paul"].serve("Hornets", 2005, 2011) + .serve("Clippers", 2011, 2017) + .serve("Rockets", 2017, 2021) + .like("LeBron James", 90) + .like("Carmelo Anthony", 90) + .like("Dwyane Wade", 90); + + players_["DeAndre Jordan"].serve("Clippers", 2008, 2018) + .serve("Mavericks", 2018, 2019) + .serve("Knicks", 2019, 2019); + + players_["Ricky Rubio"].serve("Timberwolves", 2011, 2017) + .serve("Jazz", 2017, 2019); + + players_["Rajon Rondo"].serve("Celtics", 2006, 2014) + .serve("Mavericks", 2014, 2015) + .serve("Kings", 2015, 2016) + .serve("Bulls", 2016, 2017) + .serve("Pelicans", 2017, 2018) + .serve("Lakers", 2018, 2019) + .like("Ray Allen", -1); + + players_["Kyrie Irving"].serve("Cavaliers", 2011, 2017) + .serve("Celtics", 2017, 2019) + .like("LeBron James", 13); + + players_["Vince Carter"].serve("Raptors", 1998, 2004) + .serve("Nets", 2004, 2009) + .serve("Magic", 2009, 2010) + .serve("Suns", 2010, 2011) + .serve("Mavericks", 2011, 2014) + .serve("Grizzlies", 2014, 2017) + .serve("Kings", 2017, 2018) + .serve("Hawks", 2018, 2019) + .like("Tracy McGrady", 90) + .like("Jason Kidd", 70); + + players_["Carmelo Anthony"].serve("Nuggets", 2003, 2011) + .serve("Knicks", 2011, 2017) + .serve("Thunders", 2017, 2018) + .serve("Rockets", 2018, 2019) + .like("LeBron James", 90) + .like("Chris Paul", 90) + .like("Dwyane Wade", 90); + + players_["Dwyane Wade"].serve("Heat", 2003, 2016) + .serve("Bulls", 2016, 2017) + .serve("Cavaliers", 2017, 2018) + .serve("Heat", 2018, 2019) + .like("LeBron James", 90) + .like("Chris Paul", 90) + .like("Carmelo Anthony", 90); + + players_["Joel Embiid"].serve("76ers", 2014, 2019) + .like("Ben Simmons", 80); + + players_["Paul George"].serve("Pacers", 2010, 2017) + .serve("Thunders", 2017, 2019) + .like("Russell Westbrook", 95); + + players_["Giannis Antetokounmpo"].serve("Bucks", 2013, 2019); + + players_["Yao Ming"].serve("Rockets", 2002, 2011) + .like("Tracy McGrady", 90) + .like("Shaquile O'Neal", 90); + + players_["Blake Griffin"].serve("Clippers", 2009, 2018) + .serve("Pistons", 2018, 2019) + .like("Chris Paul", -1); + + players_["Damian Lillard"].serve("Trail Blazers", 2012, 2019) + .like("LaMarcus Aldridge", 80); + + players_["Steve Nash"].serve("Suns", 1996, 1998) + .serve("Mavericks", 1998, 2004) + .serve("Suns", 2004, 2012) + .serve("Lakers", 2012, 2015) + .like("Amar'e Stoudemire", 90) + .like("Dirk Nowitzki", 88) + .like("Stephen Curry", 90) + .like("Jason Kidd", 85); + + players_["Dirk Nowitzki"].serve("Mavericks", 1998, 2019) + .like("Steve Nash", 80) + .like("Jason Kidd", 80) + .like("Dwyane Wade", 10); + + players_["Paul Gasol"].serve("Grizzlies", 2001, 2008) + .serve("Lakers", 2008, 2014) + .serve("Bulls", 2014, 2016) + .serve("Spurs", 2016, 2019) + .serve("Bucks", 2019, 2020) + .like("Kobe Bryant", 90) + .like("Marc Gasol", 99); + + players_["Marc Gasol"].serve("Grizzlies", 2008, 2019) + .serve("Raptors", 2019, 2019) + .like("Paul Gasol", 99); + + players_["Grant Hill"].serve("Pistons", 1994, 2000) + .serve("Magic", 2000, 2007) + .serve("Suns", 2007, 2012) + .serve("Clippers", 2012, 2013) + .like("Tracy McGrady", 90); + + players_["Ray Allen"].serve("Bucks", 1996, 2003) + .serve("Thunders", 2003, 2007) + .serve("Celtics", 2007, 2012) + .serve("Heat", 2012, 2014) + .like("Rajon Rondo", 9); + + players_["Klay Thompson"].serve("Warriors", 2011, 2019) + .like("Stephen Curry", 90); + + players_["Kristaps Porzingis"].serve("Knicks", 2015, 2019) + .serve("Mavericks", 2019, 2020) + .like("Luka Doncic", 90); + + players_["Shaquile O'Neal"].serve("Magic", 1992, 1996) + .serve("Lakers", 1996, 2004) + .serve("Heat", 2004, 2008) + .serve("Suns", 2008, 2009) + .serve("Cavaliers", 2009, 2010) + .serve("Celtics", 2010, 2011) + .like("JaVale McGee", 100) + .like("Tim Duncan", 80); + + players_["JaVale McGee"].serve("Wizards", 2008, 2012) + .serve("Nuggets", 2012, 2015) + .serve("Mavericks", 2015, 2016) + .serve("Warriors", 2016, 2018) + .serve("Lakers", 2018, 2019); + + players_["Dwight Howard"].serve("Magic", 2004, 2012) + .serve("Lakers", 2012, 2013) + .serve("Rockets", 2013, 2016) + .serve("Hawks", 2016, 2017) + .serve("Hornets", 2017, 2018) + .serve("Wizards", 2018, 2019); + + players_["Amar'e Stoudemire"].serve("Suns", 2002, 2010) + .serve("Knicks", 2010, 2015) + .serve("Heat", 2015, 2016) + .like("Steve Nash", 90); + + players_["Jason Kidd"].serve("Mavericks", 1994, 1996) + .serve("Suns", 1996, 2001) + .serve("Nets", 2001, 2008) + .serve("Mavericks", 2008, 2012) + .serve("Knicks", 2012, 2013) + .like("Vince Carter", 80) + .like("Steve Nash", 90) + .like("Dirk Nowitzki", 85); + + players_["Ben Simmons"].serve("76ers", 2016, 2019) + .like("Joel Embiid", 80); + + players_["Luka Doncic"].serve("Mavericks", 2018, 2019) + .like("Dirk Nowitzki", 90) + .like("Kristaps Porzingis", 90) + .like("James Harden", 80); + + { + cpp2::ExecutionResponse resp; + std::string query = "USE SPACE nba"; + auto code = client_->execute(query, resp); + if (code != cpp2::ErrorCode::SUCCEEDED) { + return TestError() << "USE SPACE nba failed" + << static_cast(code); + } + } + { + // Insert vertices `player' + cpp2::ExecutionResponse resp; + std::string query; + query.reserve(1024); + query += "INSERT VERTEX player(name, age) VALUES"; + for (auto &player : players_) { + query += "("; + query += std::to_string(player.vid()); + query += ": "; + query += "\""; + query += player.name(); + query += "\""; + query += ","; + query += std::to_string(player.age()); + query += "),\n\t"; + } + query.resize(query.size() - 3); + auto code = client_->execute(query, resp); + if (code != cpp2::ErrorCode::SUCCEEDED) { + return TestError() << "Insert `players' failed: " + << static_cast(code); + } + } + { + // Insert vertices `team' + cpp2::ExecutionResponse resp; + std::string query; + query.reserve(1024); + query += "INSERT VERTEX team(name) VALUES"; + for (auto &team : teams_) { + query += "("; + query += std::to_string(team.vid()); + query += ": "; + query += "\""; + query += team.name(); + query += "\""; + query += "),\n\t"; + } + query.resize(query.size() - 3); + auto code = client_->execute(query, resp); + if (code != cpp2::ErrorCode::SUCCEEDED) { + return TestError() << "Insert `teams' failed: " + << static_cast(code); + } + } + { + // Insert edges `serve' + cpp2::ExecutionResponse resp; + std::string query; + query.reserve(1024); + query += "INSERT EDGE serve(start_year, end_year) VALUES"; + for (auto &player : players_) { + for (auto &serve : player.serves()) { + auto &team = std::get<0>(serve); + auto startYear = std::get<1>(serve); + auto endYear = std::get<2>(serve); + query += "("; + query += std::to_string(player.vid()); + query += " -> "; + query += std::to_string(teams_[team].vid()); + query += ": "; + query += std::to_string(startYear); + query += ", "; + query += std::to_string(endYear); + query += "),\n\t"; + } + } + query.resize(query.size() - 3); + auto code = client_->execute(query, resp); + if (code != cpp2::ErrorCode::SUCCEEDED) { + return TestError() << "Insert `serve' failed: " + << static_cast(code); + } + } + { + // Insert edges `like' + cpp2::ExecutionResponse resp; + std::string query; + query.reserve(1024); + query += "INSERT EDGE like(likeness) VALUES"; + for (auto &player : players_) { + for (auto &like : player.likes()) { + auto &other = std::get<0>(like); + auto likeness = std::get<1>(like); + query += "("; + query += std::to_string(player.vid()); + query += " -> "; + query += std::to_string(players_[other].vid()); + query += ": "; + query += std::to_string(likeness); + query += "),\n\t"; + } + } + query.resize(query.size() - 3); + auto code = client_->execute(query, resp); + if (code != cpp2::ErrorCode::SUCCEEDED) { + return TestError() << "Insert `like' failed: " + << static_cast(code); + } + } + return TestOK(); +} + + +TEST_F(GoTest, DISABLED_OneStepOutBound) { + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO FROM %ld OVER serve"; + auto query = folly::stringPrintf(fmt, players_["Tim Duncan"].vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {teams_["Spurs"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Boris Diaw"]; + auto *fmt = "GO FROM %ld OVER serve YIELD " + "$^[player].name, serve.start_year, serve.end_year, $$[team].name"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {player.name(), 2003, 2005, "Hawks"}, + {player.name(), 2005, 2008, "Suns"}, + {player.name(), 2008, 2012, "Hornets"}, + {player.name(), 2012, 2016, "Spurs"}, + {player.name(), 2016, 2017, "Jazz"}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + { + cpp2::ExecutionResponse resp; + auto &player = players_["Rajon Rondo"]; + auto *fmt = "GO FROM %ld OVER serve WHERE " + "serve.start_year >= 2013 && serve.end_year <= 2018 YIELD " + "$^[player].name, serve.start_year, serve.end_year, $$[team].name"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {player.name(), 2014, 2015, "Mavericks"}, + {player.name(), 2015, 2016, "Kings"}, + {player.name(), 2016, 2017, "Bulls"}, + {player.name(), 2017, 2018, "Pelicans"}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } +} + + +TEST_F(GoTest, DISABLED_OneStepInBound) { + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO FROM %ld OVER serve REVERSELY"; + auto &team = teams_["Thunders"]; + auto query = folly::stringPrintf(fmt, team.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {players_["Russell Westbrook"].vid()}, + {players_["Kevin Durant"].vid()}, + {players_["James Harden"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } +} + + +TEST_F(GoTest, DISABLED_OneStepInOutBound) { + // Ever served in the same team + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO FROM %ld OVER serve | GO FROM $-.id OVER serve REVERSELY"; + auto &player = players_["Kobe Bryant"]; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {players_["LeBron James"].vid()}, + {players_["Rajon Rondo"].vid()}, + {players_["Kobe Bryant"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + // Ever been teammates + { + } +} + +} // namespace graph +} // namespace nebula diff --git a/src/executor/test/TestBase.h b/src/executor/test/TestBase.h index 3f5ad53a67f..2af18ba14f6 100644 --- a/src/executor/test/TestBase.h +++ b/src/executor/test/TestBase.h @@ -49,17 +49,25 @@ std::ostream& operator<<(std::ostream &os, const std::tuple &tuple) { namespace nebula { namespace graph { +using AssertionResult = ::testing::AssertionResult; class TestBase : public ::testing::Test { protected: void SetUp() override; void TearDown() override; - using AssertionResult = ::testing::AssertionResult; using ColumnType = cpp2::ColumnValue::Type; using Row = std::vector; using Rows = std::vector; + static AssertionResult TestOK() { + return ::testing::AssertionSuccess(); + } + + static AssertionResult TestError() { + return ::testing::AssertionFailure(); + } + template struct uniform_tuple_impl; template @@ -194,8 +202,6 @@ class TestBase : public ::testing::Test { } protected: - std::function TestOK = [] { return ::testing::AssertionSuccess(); }; - std::function TestError = [] { return ::testing::AssertionFailure(); }; }; } // namespace graph diff --git a/src/executor/test/TestEnv.cpp b/src/executor/test/TestEnv.cpp index 087d2488d5a..1620d6394b0 100644 --- a/src/executor/test/TestEnv.cpp +++ b/src/executor/test/TestEnv.cpp @@ -22,9 +22,9 @@ TestEnv::~TestEnv() { void TestEnv::SetUp() { - auto interface = std::make_shared(); using ThriftServer = apache::thrift::ThriftServer; server_ = std::make_unique(); + auto interface = std::make_shared(server_->getIOThreadPool()); server_->setInterface(std::move(interface)); server_->setPort(0); // Let the system choose an available port for us diff --git a/src/executor/test/TestMain.cpp b/src/executor/test/TestMain.cpp index a5cf3b97d7b..0ce74fced14 100644 --- a/src/executor/test/TestMain.cpp +++ b/src/executor/test/TestMain.cpp @@ -10,10 +10,12 @@ using nebula::graph::TestEnv; using nebula::graph::gEnv; +DECLARE_string(meta_server_addrs); int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, true); + FLAGS_meta_server_addrs = folly::stringPrintf("127.0.0.1:44503"); gEnv = new TestEnv(); // gtest will delete this env object for us ::testing::AddGlobalTestEnvironment(gEnv); diff --git a/src/parser/Clauses.cpp b/src/parser/Clauses.cpp index ad3e38518a7..522cf466c66 100644 --- a/src/parser/Clauses.cpp +++ b/src/parser/Clauses.cpp @@ -41,10 +41,6 @@ std::string FromClause::toString() const { } else { buf += ref_->toString(); } - if (alias_ != nullptr) { - buf += " AS "; - buf += *alias_; - } return buf; } diff --git a/src/parser/Clauses.h b/src/parser/Clauses.h index 353848c1d06..7813034f007 100644 --- a/src/parser/Clauses.h +++ b/src/parser/Clauses.h @@ -53,15 +53,13 @@ class SourceNodeList final { class FromClause final { public: - explicit FromClause(SourceNodeList *srcNodeList, std::string *alias = nullptr) { + explicit FromClause(SourceNodeList *srcNodeList) { srcNodeList_.reset(srcNodeList); - alias_.reset(alias); isRef_ = false; } - explicit FromClause(Expression *expr, std::string *alias = nullptr) { + explicit FromClause(Expression *expr) { ref_.reset(expr); - alias_.reset(alias); isRef_ = true; } @@ -77,10 +75,6 @@ class FromClause final { return ref_.get(); } - std::string* alias() const { - return alias_.get(); - } - bool isRef() const { return isRef_; } @@ -90,7 +84,6 @@ class FromClause final { private: std::unique_ptr srcNodeList_; std::unique_ptr ref_; - std::unique_ptr alias_; bool isRef_{false}; }; diff --git a/src/parser/Expressions.cpp b/src/parser/Expressions.cpp index 8292b32791c..e229afd9343 100644 --- a/src/parser/Expressions.cpp +++ b/src/parser/Expressions.cpp @@ -20,44 +20,12 @@ namespace nebula { -void ExpressionContext::print() const { - for (auto &entry : aliasInfo_) { - FLOG_INFO("Alias `%s': kind of `%s'", - entry.first.c_str(), aliasKindToString(entry.second.kind_).c_str()); - } - if (!srcNodePropNames_.empty()) { - auto srclist = std::accumulate(std::next(srcNodePropNames_.begin()), - srcNodePropNames_.end(), - *srcNodePropNames_.begin(), - [] (auto &a, auto &b) { return a + ", " + b; }); - FLOG_INFO("Referred source node's properties: %s", srclist.c_str()); - } - if (!dstNodePropNames_.empty()) { - auto dstlist = std::accumulate(std::next(dstNodePropNames_.begin()), - dstNodePropNames_.end(), - *dstNodePropNames_.begin(), - [] (auto &a, auto &b) { return a + ", " + b; }); - FLOG_INFO("Referred destination node's properties: %s", dstlist.c_str()); - } - if (!edgePropNames_.empty()) { - auto edgelist = std::accumulate(std::next(edgePropNames_.begin()), - edgePropNames_.end(), - *edgePropNames_.begin(), - [] (auto &a, auto &b) { return a + ", " + b; }); - FLOG_INFO("Referred edge's properties: %s", edgelist.c_str()); - } -} - Status ExpressionContext::addAliasProp(const std::string &alias, const std::string &prop) { auto kind = aliasKind(alias); if (kind == AliasKind::Unknown) { return Status::Error("Alias `%s' not defined", alias.c_str()); } - if (kind == AliasKind::SourceNode) { - addSrcNodeProp(prop); - return Status::OK(); - } if (kind == AliasKind::Edge) { addEdgeProp(prop); return Status::OK(); @@ -113,8 +81,6 @@ std::unique_ptr Expression::makeExpr(uint8_t kind) { return std::make_unique(); case kLogical: return std::make_unique(); - case kSourceId: - return std::make_unique(); case kSourceProp: return std::make_unique(); case kEdgeRank: @@ -129,8 +95,6 @@ std::unique_ptr Expression::makeExpr(uint8_t kind) { return std::make_unique(); case kVariableProp: return std::make_unique(); - case kDestId: - return std::make_unique(); case kDestProp: return std::make_unique(); case kInputProp: @@ -171,7 +135,7 @@ Expression::decode(folly::StringPiece buffer) noexcept { std::string InputPropertyExpression::toString() const { std::string buf; buf.reserve(64); - buf += "$_."; + buf += "$-."; buf += *prop_; return buf; } @@ -219,6 +183,7 @@ VariantType DestPropertyExpression::eval() const { Status DestPropertyExpression::prepare() { + context_->addDstTagProp(*tag_, *prop_); return Status::OK(); } @@ -256,46 +221,6 @@ const char* DestPropertyExpression::decode(const char *pos, const char *end) { } -std::string DestIdExpression::toString() const { - std::string buf; - buf.reserve(64); - buf += "$$["; - buf += *tag_; - buf += "]._id"; - return buf; -} - - -VariantType DestIdExpression::eval() const { - return context_->getters().getDstTagId(); -} - - -Status DestIdExpression::prepare() { - return Status::OK(); -} - - -void DestIdExpression::encode(Cord &cord) const { - cord << kindToInt(kind()); - cord << static_cast(tag_->size()); - cord << *tag_; -} - - -const char* DestIdExpression::decode(const char *pos, const char *end) { - THROW_IF_NO_SPACE(pos, end, 2UL); - auto size = *reinterpret_cast(pos); - pos += 2; - - THROW_IF_NO_SPACE(pos, end, size); - tag_ = std::make_unique(pos, size); - pos += size; - - return pos; -} - - std::string VariablePropertyExpression::toString() const { std::string buf; buf.reserve(64); @@ -455,7 +380,7 @@ std::string EdgeSrcIdExpression::toString() const { VariantType EdgeSrcIdExpression::eval() const { - return context_->getters().getSrcTagId(); + return context_->getters().getEdgeProp("_src"); } @@ -494,7 +419,7 @@ std::string EdgeDstIdExpression::toString() const { VariantType EdgeDstIdExpression::eval() const { - return context_->getters().getDstTagId(); + return context_->getters().getEdgeProp("_dst"); } @@ -533,7 +458,7 @@ std::string EdgeRankExpression::toString() const { VariantType EdgeRankExpression::eval() const { - return context_->getters().getEdgeRank(); + return context_->getters().getEdgeProp("_rank"); } @@ -565,8 +490,7 @@ const char* EdgeRankExpression::decode(const char *pos, const char *end) { std::string SourcePropertyExpression::toString() const { std::string buf; buf.reserve(64); - buf += *alias_; - buf += "["; + buf += "$^["; buf += *tag_; buf += "]."; buf += *prop_; @@ -580,6 +504,7 @@ VariantType SourcePropertyExpression::eval() const { Status SourcePropertyExpression::prepare() { + context_->addSrcTagProp(*tag_, *prop_); return Status::OK(); } @@ -617,47 +542,6 @@ const char* SourcePropertyExpression::decode(const char *pos, const char *end) { } -std::string SourceIdExpression::toString() const { - std::string buf; - buf.reserve(64); - buf += *alias_; - buf += "["; - buf += *tag_; - buf += "]._id"; - return buf; -} - - -VariantType SourceIdExpression::eval() const { - return context_->getters().getSrcTagId(); -} - - -Status SourceIdExpression::prepare() { - return Status::OK(); -} - - -void SourceIdExpression::encode(Cord &cord) const { - cord << kindToInt(kind()); - cord << static_cast(tag_->size()); - cord << *tag_; -} - - -const char* SourceIdExpression::decode(const char *pos, const char *end) { - THROW_IF_NO_SPACE(pos, end, 2UL); - auto size = *reinterpret_cast(pos); - pos += 2; - - THROW_IF_NO_SPACE(pos, end, size); - tag_ = std::make_unique(pos, size); - pos += size; - - return pos; -} - - std::string PrimaryExpression::toString() const { char buf[1024]; switch (operand_.which()) { diff --git a/src/parser/Expressions.h b/src/parser/Expressions.h index 5a47c344ce0..eafcc117f7e 100644 --- a/src/parser/Expressions.h +++ b/src/parser/Expressions.h @@ -41,16 +41,16 @@ using VariantType = boost::variant; class ExpressionContext final { public: - void addSrcNodeProp(const std::string &prop) { - srcNodePropNames_.emplace(prop); + void addSrcTagProp(const std::string &tag, const std::string &prop) { + srcTagProps_.emplace(tag, prop); } - void addDstNodeProp(const std::string &prop) { - dstNodePropNames_.emplace(prop); + void addDstTagProp(const std::string &tag, const std::string &prop) { + dstTagProps_.emplace(tag, prop); } void addEdgeProp(const std::string &prop) { - edgePropNames_.emplace(prop); + edgeProps_.emplace(prop); } Status addAliasProp(const std::string &alias, const std::string &prop); @@ -74,22 +74,26 @@ class ExpressionContext final { return iter->second.kind_; } - std::vector srcNodePropNames() const { - return std::vector(srcNodePropNames_.begin(), srcNodePropNames_.end()); + using TagProp = std::pair; + + std::vector srcTagProps() const { + return std::vector(srcTagProps_.begin(), srcTagProps_.end()); + } + + std::vector dstTagProps() const { + return std::vector(dstTagProps_.begin(), dstTagProps_.end()); } - std::vector dstNodePropNames() const { - return std::vector(dstNodePropNames_.begin(), dstNodePropNames_.end()); + std::vector edgeProps() const { + return std::vector(edgeProps_.begin(), edgeProps_.end()); } - std::vector edgePropNames() const { - return std::vector(edgePropNames_.begin(), edgePropNames_.end()); + bool hasDstTagProp() const { + return !dstTagProps_.empty(); } struct Getters { - std::function getSrcTagId; - std::function getDstTagId; - std::function getEdgeRank; + std::function getEdgeRank; std::function getEdgeProp; std::function getInputProp; std::function getSrcTagProp; @@ -105,9 +109,9 @@ class ExpressionContext final { private: Getters getters_; std::unordered_map aliasInfo_; - std::unordered_set srcNodePropNames_; - std::unordered_set dstNodePropNames_; - std::unordered_set edgePropNames_; + std::unordered_set srcTagProps_; + std::unordered_set dstTagProps_; + std::unordered_set edgeProps_; }; @@ -125,6 +129,14 @@ class Expression { virtual VariantType eval() const = 0; + virtual bool isInputExpression() const { + return kind_ == kInputProp; + } + + virtual bool isVariableExpression() const { + return kind_ == kVariableProp; + } + /** * To encode an expression into a byte buffer. * @@ -205,7 +217,6 @@ class Expression { kArithmetic, kRelational, kLogical, - kSourceId, kSourceProp, kEdgeRank, kEdgeDstId, @@ -213,7 +224,6 @@ class Expression { kEdgeType, kEdgeProp, kVariableProp, - kDestId, kDestProp, kInputProp, @@ -245,7 +255,6 @@ class Expression { friend class ArithmeticExpression; friend class RelationalExpression; friend class LogicalExpression; - friend class SourceIdExpression; friend class SourcePropertyExpression; friend class EdgeRankExpression; friend class EdgeDstIdExpression; @@ -253,7 +262,6 @@ class Expression { friend class EdgeTypeExpression; friend class EdgePropertyExpression; friend class VariablePropertyExpression; - friend class DestIdExpression; friend class InputPropertyExpression; virtual void encode(Cord &cord) const = 0; @@ -291,6 +299,10 @@ class InputPropertyExpression final : public Expression { return Status::OK(); } + std::string* prop() const { + return prop_.get(); + } + private: void encode(Cord &cord) const override; @@ -331,34 +343,6 @@ class DestPropertyExpression final : public Expression { }; -// $$[TagName]._id -class DestIdExpression final : public Expression { -public: - DestIdExpression() { - kind_ = kDestId; - } - - explicit DestIdExpression(std::string *tag) { - kind_ = kDestId; - tag_.reset(tag); - } - - std::string toString() const override; - - VariantType eval() const override; - - Status MUST_USE_RESULT prepare() override; - -private: - void encode(Cord &cord) const override; - - const char* decode(const char *pos, const char *end) override; - -private: - std::unique_ptr tag_; -}; - - // $VarName.any_prop_name class VariablePropertyExpression final : public Expression { public: @@ -378,6 +362,14 @@ class VariablePropertyExpression final : public Expression { Status MUST_USE_RESULT prepare() override; + std::string* var() const { + return var_.get(); + } + + std::string* prop() const { + return prop_.get(); + } + private: void encode(Cord &cord) const override; @@ -531,16 +523,15 @@ class EdgeRankExpression final : public Expression { }; -// Alias[TagName].any_prop_name +// $^[TagName].any_prop_name class SourcePropertyExpression final : public Expression { public: SourcePropertyExpression() { kind_ = kSourceProp; } - SourcePropertyExpression(std::string *alias, std::string *tag, std::string *prop) { + SourcePropertyExpression(std::string *tag, std::string *prop) { kind_ = kSourceProp; - alias_.reset(alias); tag_.reset(tag); prop_.reset(prop); } @@ -557,42 +548,11 @@ class SourcePropertyExpression final : public Expression { const char* decode(const char *pos, const char *end) override; private: - std::unique_ptr alias_; std::unique_ptr tag_; std::unique_ptr prop_; }; -// Alias[TagName]._id -class SourceIdExpression final : public Expression { -public: - SourceIdExpression() { - kind_ = kSourceId; - } - - SourceIdExpression(std::string *alias, std::string *tag) { - kind_ = kSourceId; - alias_.reset(alias); - tag_.reset(tag); - } - - std::string toString() const override; - - VariantType eval() const override; - - Status MUST_USE_RESULT prepare() override; - -private: - void encode(Cord &cord) const override; - - const char* decode(const char *pos, const char *end) override; - -private: - std::unique_ptr alias_; - std::unique_ptr tag_; -}; - - // literal constants: bool, integer, double, string class PrimaryExpression final : public Expression { public: diff --git a/src/parser/MutateSentences.cpp b/src/parser/MutateSentences.cpp index 66a062367dd..af2f22ad6f3 100644 --- a/src/parser/MutateSentences.cpp +++ b/src/parser/MutateSentences.cpp @@ -19,6 +19,7 @@ std::string PropertyList::toString() const { return buf; } + std::string ValueList::toString() const { std::string buf; buf.reserve(256); @@ -30,14 +31,11 @@ std::string ValueList::toString() const { return buf; } -std::string InsertVertexSentence::toString() const { + +std::string VertexRowItem::toString() const { std::string buf; buf.reserve(256); - buf += "INSERT VERTEX "; - buf += *vertex_; buf += "("; - buf += properties_->toString(); - buf += ") VALUES("; buf += std::to_string(id_); buf += ": "; buf += values_->toString(); @@ -45,18 +43,36 @@ std::string InsertVertexSentence::toString() const { return buf; } -std::string InsertEdgeSentence::toString() const { + +std::string VertexRowList::toString() const { std::string buf; buf.reserve(256); - buf += "INSERT EDGE "; - if (!overwritable_) { - buf += "NO OVERWRITE "; + for (auto &item : rows_) { + buf += item->toString(); + buf += ","; } - buf += *edge_; + buf.resize(buf.size() - 1); + return buf; +} + + +std::string InsertVertexSentence::toString() const { + std::string buf; + buf.reserve(256); + buf += "INSERT VERTEX "; + buf += *vertex_; buf += "("; buf += properties_->toString(); - buf += ") "; - buf += "VALUES("; + buf += ") VALUES"; + buf += rows_->toString(); + return buf; +} + + +std::string EdgeRowItem::toString() const { + std::string buf; + buf.reserve(256); + buf += "("; buf += std::to_string(srcid_); buf += " -> "; buf += std::to_string(dstid_); @@ -70,6 +86,35 @@ std::string InsertEdgeSentence::toString() const { return buf; } + +std::string EdgeRowList::toString() const { + std::string buf; + buf.reserve(256); + for (auto &item : rows_) { + buf += item->toString(); + buf += ","; + } + buf.resize(buf.size() - 1); + return buf; +} + + +std::string InsertEdgeSentence::toString() const { + std::string buf; + buf.reserve(256); + buf += "INSERT EDGE "; + if (!overwritable_) { + buf += "NO OVERWRITE "; + } + buf += *edge_; + buf += "("; + buf += properties_->toString(); + buf += ") VALUES"; + buf += rows_->toString(); + return buf; +} + + std::string UpdateItem::toString() const { std::string buf; buf.reserve(256); @@ -79,6 +124,7 @@ std::string UpdateItem::toString() const { return buf; } + std::string UpdateList::toString() const { std::string buf; buf.reserve(256); @@ -90,6 +136,7 @@ std::string UpdateList::toString() const { return buf; } + std::string UpdateVertexSentence::toString() const { std::string buf; buf.reserve(256); @@ -113,6 +160,7 @@ std::string UpdateVertexSentence::toString() const { return buf; } + std::string UpdateEdgeSentence::toString() const { std::string buf; buf.reserve(256); diff --git a/src/parser/MutateSentences.h b/src/parser/MutateSentences.h index 5a3d9bdf558..308cb1b7cf2 100644 --- a/src/parser/MutateSentences.h +++ b/src/parser/MutateSentences.h @@ -32,6 +32,7 @@ class PropertyList final { std::vector> properties_; }; + class ValueList final { public: void addValue(Expression *value) { @@ -52,14 +53,66 @@ class ValueList final { std::vector> values_; }; -class InsertVertexSentence final : public Sentence { + +class VertexRowItem final { public: - InsertVertexSentence(int64_t id, std::string *vertex, PropertyList *props, - ValueList *values, bool overwritable = true) { + VertexRowItem(int64_t id, ValueList *values) { id_ = id; + values_.reset(values); + } + + int64_t id() const { + return id_; + } + + std::vector values() const { + return values_->values(); + } + + std::string toString() const; + +private: + int64_t id_; + std::unique_ptr values_; +}; + + +class VertexRowList final { +public: + void addRow(VertexRowItem *row) { + rows_.emplace_back(row); + } + + /** + * For now, we haven't execution plan cache supported. + * So to avoid too many deep copying, we return the fields or nodes + * of kinds of parsing tree in such a shallow copy way, + * just like in all other places. + * In the future, we might do deep copy to the plan, + * of course excluding the volatile arguments in queries. + */ + std::vector rows() const { + std::vector result; + result.resize(rows_.size()); + auto get = [] (const auto &ptr) { return ptr.get(); }; + std::transform(rows_.begin(), rows_.end(), result.begin(), get); + return result; + } + + std::string toString() const; + +private: + std::vector> rows_; +}; + + +class InsertVertexSentence final : public Sentence { +public: + InsertVertexSentence(std::string *vertex, PropertyList *props, + VertexRowList *rows, bool overwritable = true) { vertex_.reset(vertex); properties_.reset(props); - values_.reset(values); + rows_.reset(rows); overwritable_ = overwritable; kind_ = Kind::kInsertVertex; } @@ -68,10 +121,6 @@ class InsertVertexSentence final : public Sentence { return overwritable_; } - int64_t id() const { - return id_; - } - std::string* vertex() const { return vertex_.get(); } @@ -80,8 +129,8 @@ class InsertVertexSentence final : public Sentence { return properties_->properties(); } - std::vector values() const { - return values_->values(); + std::vector rows() const { + return rows_->rows(); } std::string toString() const override; @@ -91,46 +140,85 @@ class InsertVertexSentence final : public Sentence { int64_t id_; std::unique_ptr vertex_; std::unique_ptr properties_; - std::unique_ptr values_; + std::unique_ptr rows_; }; -class InsertEdgeSentence final : public Sentence { -public: - InsertEdgeSentence() { - kind_ = Kind::kInsertEdge; - } - void setOverwrite(bool overwritable) { - overwritable_ = overwritable; - } - bool overwritable() const { - return overwritable_; +class EdgeRowItem final { +public: + EdgeRowItem(int64_t srcid, int64_t dstid, ValueList *values) { + srcid_ = srcid; + dstid_ = dstid; + values_.reset(values); } - void setSrcId(int64_t srcid) { + EdgeRowItem(int64_t srcid, int64_t dstid, int64_t rank, ValueList *values) { srcid_ = srcid; + dstid_ = dstid; + rank_ = rank; + values_.reset(values); } int64_t srcid() const { return srcid_; } - void setDstId(int64_t dstid) { - dstid_ = dstid; - } - int64_t dstid() const { return dstid_; } - void setRank(int64_t rank) { - rank_ = rank; - } - int64_t rank() const { return rank_; } + std::vector values() const { + return values_->values(); + } + + std::string toString() const; + +private: + int64_t srcid_{0}; + int64_t dstid_{0}; + int64_t rank_{0}; + std::unique_ptr values_; +}; + + +class EdgeRowList final { +public: + void addRow(EdgeRowItem *row) { + rows_.emplace_back(row); + } + + std::vector rows() const { + std::vector result; + result.resize(rows_.size()); + auto get = [] (const auto &ptr) { return ptr.get(); }; + std::transform(rows_.begin(), rows_.end(), result.begin(), get); + return result; + } + + std::string toString() const; + +private: + std::vector> rows_; +}; + + +class InsertEdgeSentence final : public Sentence { +public: + InsertEdgeSentence() { + kind_ = Kind::kInsertEdge; + } + void setOverwrite(bool overwritable) { + overwritable_ = overwritable; + } + + bool overwritable() const { + return overwritable_; + } + void setEdge(std::string *edge) { edge_.reset(edge); } @@ -147,26 +235,24 @@ class InsertEdgeSentence final : public Sentence { return properties_->properties(); } - void setValues(ValueList *values) { - values_.reset(values); + void setRows(EdgeRowList *rows) { + rows_.reset(rows); } - std::vector values() const { - return values_->values(); + std::vector rows() const { + return rows_->rows(); } std::string toString() const override; private: bool overwritable_{true}; - int64_t srcid_{0}; - int64_t dstid_{0}; - int64_t rank_{0}; std::unique_ptr edge_; std::unique_ptr properties_; - std::unique_ptr values_; + std::unique_ptr rows_; }; + class UpdateItem final { public: UpdateItem(std::string *field, Expression *value) { @@ -181,6 +267,7 @@ class UpdateItem final { std::unique_ptr value_; }; + class UpdateList final { public: void addItem(UpdateItem *item) { @@ -193,6 +280,7 @@ class UpdateList final { std::vector> items_; }; + class UpdateVertexSentence final : public Sentence { public: void setInsertable(bool insertable) { @@ -225,6 +313,7 @@ class UpdateVertexSentence final : public Sentence { std::unique_ptr yieldClause_; }; + class UpdateEdgeSentence final : public Sentence { public: void setInsertable(bool insertable) { diff --git a/src/parser/TraverseSentences.h b/src/parser/TraverseSentences.h index ca9442e5cb9..7389dab96d4 100644 --- a/src/parser/TraverseSentences.h +++ b/src/parser/TraverseSentences.h @@ -120,8 +120,8 @@ class UseSentence final : public Sentence { space_.reset(space); } - const std::string& space() const { - return *space_; + const std::string* space() const { + return space_.get(); } std::string toString() const override; @@ -185,6 +185,14 @@ class AssignmentSentence final : public Sentence { sentence_.reset(sentence); } + std::string* var() const { + return variable_.get(); + } + + Sentence* sentence() const { + return sentence_.get(); + } + std::string toString() const override; private: diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 9e0eb00b354..981bf2904f4 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -50,6 +50,10 @@ class GraphScanner; nebula::YieldColumn *yield_column; nebula::PropertyList *prop_list; nebula::ValueList *value_list; + nebula::VertexRowList *vertex_row_list; + nebula::VertexRowItem *vertex_row_item; + nebula::EdgeRowList *edge_row_list; + nebula::EdgeRowItem *edge_row_item; nebula::UpdateList *update_list; nebula::UpdateItem *update_item; nebula::EdgeList *edge_list; @@ -64,7 +68,7 @@ class GraphScanner; %token L_PAREN R_PAREN L_BRACKET R_BRACKET L_BRACE R_BRACE COMMA %token PIPE OR AND LT LE GT GE EQ NE ADD SUB MUL DIV MOD NOT NEG ASSIGN %token DOT COLON SEMICOLON L_ARROW R_ARROW AT -%token ID_PROP TYPE_PROP SRC_ID_PROP DST_ID_PROP RANK_PROP INPUT_REF DST_REF +%token ID_PROP TYPE_PROP SRC_ID_PROP DST_ID_PROP RANK_PROP INPUT_REF DST_REF SRC_REF /* token type specification */ %token BOOL @@ -75,7 +79,11 @@ class GraphScanner; %type expression logic_or_expression logic_and_expression %type relational_expression multiplicative_expression additive_expression %type unary_expression primary_expression equality_expression -%type ref_expression input_ref_expression dst_ref_expression var_ref_expression +%type ref_expression +%type src_ref_expression +%type dst_ref_expression +%type input_ref_expression +%type var_ref_expression %type alias_ref_expression %type type_spec %type step_clause @@ -88,6 +96,10 @@ class GraphScanner; %type yield_column %type prop_list %type value_list +%type vertex_row_list +%type vertex_row_item +%type edge_row_list +%type edge_row_item %type update_list %type update_item %type edge_list @@ -133,6 +145,9 @@ primary_expression | input_ref_expression { $$ = $1; } + | src_ref_expression { + $$ = $1; + } | dst_ref_expression { $$ = $1; } @@ -153,11 +168,14 @@ input_ref_expression } ; -dst_ref_expression - : DST_REF L_BRACKET LABEL R_BRACKET DOT ID_PROP { - $$ = new DestIdExpression($3); +src_ref_expression + : SRC_REF L_BRACKET LABEL R_BRACKET DOT LABEL { + $$ = new SourcePropertyExpression($3, $6); } - | DST_REF L_BRACKET LABEL R_BRACKET DOT LABEL { + ; + +dst_ref_expression + : DST_REF L_BRACKET LABEL R_BRACKET DOT LABEL { $$ = new DestPropertyExpression($3, $6); } ; @@ -184,12 +202,6 @@ alias_ref_expression | LABEL DOT RANK_PROP { $$ = new EdgeRankExpression($1); } - | LABEL L_BRACKET LABEL R_BRACKET DOT LABEL { - $$ = new SourcePropertyExpression($1, $3, $6); - } - | LABEL L_BRACKET LABEL R_BRACKET DOT ID_PROP { - $$ = new SourceIdExpression($1, $3); - } ; unary_expression @@ -291,6 +303,15 @@ go_sentence go->setFromClause($3); go->setOverClause($4); go->setWhereClause($5); + if ($6 == nullptr) { + auto *edge = new std::string(*$4->edge()); + auto *expr = new EdgeDstIdExpression(edge); + auto *alias = new std::string("id"); + auto *col = new YieldColumn(expr, alias); + auto *cols = new YieldColumns(); + cols->addColumn(col); + $6 = new YieldClause(cols); + } go->setYieldClause($6); $$ = go; } @@ -307,18 +328,10 @@ from_clause auto from = new FromClause($2); $$ = from; } - | KW_FROM id_list KW_AS LABEL { - auto from = new FromClause($2, $4); - $$ = from; - } | KW_FROM ref_expression { auto from = new FromClause($2); $$ = from; } - | KW_FROM ref_expression KW_AS LABEL { - auto from = new FromClause($2, $4); - $$ = from; - } ; ref_expression @@ -509,11 +522,8 @@ assignment_sentence ; insert_vertex_sentence - : KW_INSERT KW_VERTEX LABEL L_PAREN prop_list R_PAREN KW_VALUES L_PAREN INTEGER COLON value_list R_PAREN { - $$ = new InsertVertexSentence($9, $3, $5, $11); - } - | KW_INSERT KW_TAG LABEL L_PAREN prop_list R_PAREN KW_VALUES L_PAREN INTEGER COLON value_list R_PAREN { - $$ = new InsertVertexSentence($9, $3, $5, $11); + : KW_INSERT KW_VERTEX LABEL L_PAREN prop_list R_PAREN KW_VALUES vertex_row_list { + $$ = new InsertVertexSentence($3, $5, $8); } ; @@ -531,6 +541,23 @@ prop_list } ; +vertex_row_list + : vertex_row_item { + $$ = new VertexRowList(); + $$->addRow($1); + } + | vertex_row_list COMMA vertex_row_item { + $1->addRow($3); + $$ = $1; + } + ; + +vertex_row_item + : L_PAREN INTEGER COLON value_list R_PAREN { + $$ = new VertexRowItem($2, $4); + } + ; + value_list : expression { $$ = new ValueList(); @@ -546,49 +573,40 @@ value_list ; insert_edge_sentence - : KW_INSERT KW_EDGE LABEL L_PAREN prop_list R_PAREN KW_VALUES L_PAREN - INTEGER R_ARROW INTEGER COLON value_list R_PAREN { + : KW_INSERT KW_EDGE LABEL L_PAREN prop_list R_PAREN KW_VALUES edge_row_list { auto sentence = new InsertEdgeSentence(); sentence->setEdge($3); sentence->setProps($5); - sentence->setSrcId($9); - sentence->setDstId($11); - sentence->setValues($13); + sentence->setRows($8); $$ = sentence; } - | KW_INSERT KW_EDGE KW_NO KW_OVERWRITE LABEL L_PAREN prop_list R_PAREN - KW_VALUES L_PAREN INTEGER R_ARROW INTEGER COLON value_list R_PAREN { + | KW_INSERT KW_EDGE KW_NO KW_OVERWRITE LABEL L_PAREN prop_list R_PAREN KW_VALUES edge_row_list { auto sentence = new InsertEdgeSentence(); sentence->setOverwrite(false); sentence->setEdge($5); sentence->setProps($7); - sentence->setSrcId($11); - sentence->setDstId($13); - sentence->setValues($15); + sentence->setRows($10); $$ = sentence; } - | KW_INSERT KW_EDGE LABEL L_PAREN prop_list R_PAREN KW_VALUES L_PAREN - INTEGER R_ARROW INTEGER AT INTEGER COLON value_list R_PAREN { - auto sentence = new InsertEdgeSentence(); - sentence->setEdge($3); - sentence->setProps($5); - sentence->setSrcId($9); - sentence->setDstId($11); - sentence->setRank($13); - sentence->setValues($15); - $$ = sentence; + ; + +edge_row_list + : edge_row_item { + $$ = new EdgeRowList(); + $$->addRow($1); } - | KW_INSERT KW_EDGE KW_NO KW_OVERWRITE LABEL L_PAREN prop_list R_PAREN KW_VALUES L_PAREN - INTEGER R_ARROW INTEGER AT INTEGER COLON value_list R_PAREN { - auto sentence = new InsertEdgeSentence(); - sentence->setOverwrite(false); - sentence->setEdge($5); - sentence->setProps($7); - sentence->setSrcId($11); - sentence->setDstId($13); - sentence->setRank($15); - sentence->setValues($17); - $$ = sentence; + | edge_row_list COMMA edge_row_item { + $1->addRow($3); + $$ = $1; + } + ; + +edge_row_item + : L_PAREN INTEGER R_ARROW INTEGER COLON value_list R_PAREN { + $$ = new EdgeRowItem($2, $4, $6); + } + | L_PAREN INTEGER R_ARROW INTEGER AT INTEGER COLON value_list R_PAREN { + $$ = new EdgeRowItem($2, $4, $6, $8); } ; diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index fc9dce01aa0..b6da72a7856 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -160,8 +160,9 @@ OCT ([0-7]) "_src" { return TokenType::SRC_ID_PROP; } "_dst" { return TokenType::DST_ID_PROP; } "_rank" { return TokenType::RANK_PROP; } -"$_" { return TokenType::INPUT_REF; } "$$" { return TokenType::DST_REF; } +"$^" { return TokenType::SRC_REF; } +"$-" { return TokenType::INPUT_REF; } {LABEL} { yylval->strval = new std::string(yytext, yyleng); @@ -182,7 +183,7 @@ OCT ([0-7]) yylval->intval = val; return TokenType::INTEGER; } -{DEC}+ { yylval->intval = ::atoll(yytext); return TokenType::INTEGER; } +[+-]?{DEC}+ { yylval->intval = ::atoll(yytext); return TokenType::INTEGER; } {DEC}+\.{DEC}* { yylval->doubleval = ::atof(yytext); return TokenType::DOUBLE; } {DEC}*\.{DEC}+ { yylval->doubleval = ::atof(yytext); return TokenType::DOUBLE; } diff --git a/src/parser/test/ExpressionTest.cpp b/src/parser/test/ExpressionTest.cpp index c2058bc2c14..44e3978fc0b 100644 --- a/src/parser/test/ExpressionTest.cpp +++ b/src/parser/test/ExpressionTest.cpp @@ -400,7 +400,7 @@ TEST_F(ExpressionTest, LiteralConstantsLogical) { TEST_F(ExpressionTest, InputReference) { GQLParser parser; { - std::string query = "GO FROM 1 OVER follow WHERE $_.name"; + std::string query = "GO FROM 1 OVER follow WHERE $-.name"; auto parsed = parser.parse(query); ASSERT_TRUE(parsed.ok()) << parsed.status(); auto *expr = getFilterExpr(parsed.value().get()); @@ -419,7 +419,7 @@ TEST_F(ExpressionTest, InputReference) { ASSERT_EQ("Freddie", Expression::asString(value)); } { - std::string query = "GO FROM 1 OVER follow WHERE $_.age >= 18"; + std::string query = "GO FROM 1 OVER follow WHERE $-.age >= 18"; auto parsed = parser.parse(query); ASSERT_TRUE(parsed.ok()) << parsed.status(); auto *expr = getFilterExpr(parsed.value().get()); @@ -443,8 +443,7 @@ TEST_F(ExpressionTest, InputReference) { TEST_F(ExpressionTest, SourceTagReference) { GQLParser parser; { - std::string query = "GO FROM 1 AS src OVER follow WHERE src[person].name == \"dutor\" " - "&& src[person]._id == 1"; + std::string query = "GO FROM 1 OVER follow WHERE $^[person].name == \"dutor\""; auto parsed = parser.parse(query); ASSERT_TRUE(parsed.ok()) << parsed.status(); auto *expr = getFilterExpr(parsed.value().get()); @@ -456,9 +455,6 @@ TEST_F(ExpressionTest, SourceTagReference) { } return std::string("nobody"); }; - ctx->getters().getSrcTagId = [] () -> int64_t { - return 1L; - }; expr->setContext(ctx.get()); auto value = expr->eval(); ASSERT_TRUE(Expression::isBool(value)); @@ -478,16 +474,16 @@ TEST_F(ExpressionTest, EdgeReference) { auto *expr = getFilterExpr(parsed.value().get()); ASSERT_NE(nullptr, expr); auto ctx = std::make_unique(); - ctx->getters().getSrcTagId = [] () -> int64_t { - return 0L; - }; - ctx->getters().getDstTagId = [] () -> int64_t { - return 2L; - }; ctx->getters().getEdgeProp = [] (auto &prop) -> VariantType { if (prop == "cur_time") { return static_cast(::time(NULL)); } + if (prop == "_src") { + return 0L; + } + if (prop == "_dst") { + return 2L; + } return 1545798790L; }; expr->setContext(ctx.get()); diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index f803b911223..a297efa2841 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -15,80 +15,80 @@ namespace nebula { TEST(Parser, Go) { { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend;"; + std::string query = "GO FROM 1 OVER friend;"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO 2 STEPS FROM 1 AS person OVER friend"; + std::string query = "GO 2 STEPS FROM 1 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO UPTO 2 STEPS FROM 1 AS person OVER friend"; + std::string query = "GO UPTO 2 STEPS FROM 1 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend REVERSELY"; + std::string query = "GO FROM 1 OVER friend REVERSELY"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend YIELD person.name"; + std::string query = "GO FROM 1 OVER friend YIELD person.name"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend " - "YIELD person[manager].name,person[manager].age"; + std::string query = "GO FROM 1 OVER friend " + "YIELD $^[manager].name,$^[manager].age"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1,2,3 AS person OVER friend"; + std::string query = "GO FROM 1,2,3 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM $_.id AS person OVER friend"; + std::string query = "GO FROM $-.id OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM $_.col1 AS person OVER friend"; + std::string query = "GO FROM $-.col1 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM $_.id AS person OVER friend"; + std::string query = "GO FROM $-.id OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1,2,3 AS person OVER friend WHERE person.name == \"dutor\""; + std::string query = "GO FROM 1,2,3 OVER friend WHERE person.name == \"dutor\""; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } @@ -125,30 +125,30 @@ TEST(Parser, AlterTag) { TEST(Parser, Set) { { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend INTERSECT " - "GO FROM 2 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend INTERSECT " + "GO FROM 2 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend UNION " - "GO FROM 2 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend UNION " + "GO FROM 2 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend MINUS " - "GO FROM 2 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend MINUS " + "GO FROM 2 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend MINUS " - "GO FROM 2 AS person OVER friend UNION " - "GO FROM 3 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend MINUS " + "GO FROM 2 OVER friend UNION " + "GO FROM 3 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } @@ -157,17 +157,17 @@ TEST(Parser, Set) { TEST(Parser, Pipe) { { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend | " - "GO FROM 2 AS person OVER friend | " - "GO FROM 3 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend | " + "GO FROM 2 OVER friend | " + "GO FROM 3 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } { GQLParser parser; - std::string query = "GO FROM 1 AS person OVER friend MINUS " - "GO FROM 2 AS person OVER friend | " - "GO FROM 3 AS person OVER friend"; + std::string query = "GO FROM 1 OVER friend MINUS " + "GO FROM 2 OVER friend | " + "GO FROM 3 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } @@ -181,13 +181,6 @@ TEST(Parser, InsertVertex) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } - { - GQLParser parser; - std::string query = "INSERT TAG person(name,age,married,salary) " - "VALUES(12345: \"dutor\", 30, true, 3.14, 1551331900)"; - auto result = parser.parse(query); - ASSERT_TRUE(result.ok()) << result.status(); - } } TEST(Parser, UpdateVertex) { diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index c86689ebc03..a8c31785750 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -138,7 +138,8 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("<-", TokenType::L_ARROW), CHECK_SEMANTIC_TYPE("->", TokenType::R_ARROW), - CHECK_SEMANTIC_TYPE("$_", TokenType::INPUT_REF), + CHECK_SEMANTIC_TYPE("$-", TokenType::INPUT_REF), + CHECK_SEMANTIC_TYPE("$^", TokenType::SRC_REF), CHECK_SEMANTIC_TYPE("$$", TokenType::DST_REF), CHECK_SEMANTIC_TYPE("GO", TokenType::KW_GO), @@ -224,8 +225,6 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("_src", TokenType::SRC_ID_PROP), CHECK_SEMANTIC_TYPE("_dst", TokenType::DST_ID_PROP), CHECK_SEMANTIC_TYPE("_rank", TokenType::RANK_PROP), - CHECK_SEMANTIC_TYPE("$_", TokenType::INPUT_REF), - CHECK_SEMANTIC_TYPE("$$", TokenType::DST_REF), CHECK_SEMANTIC_VALUE("TRUE", TokenType::BOOL, true), CHECK_SEMANTIC_VALUE("true", TokenType::BOOL, true), @@ -239,6 +238,7 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_VALUE("label123", TokenType::LABEL, "label123"), CHECK_SEMANTIC_VALUE("123", TokenType::INTEGER, 123), + CHECK_SEMANTIC_VALUE("-123", TokenType::INTEGER, -123), CHECK_SEMANTIC_VALUE("0x123", TokenType::INTEGER, 0x123), CHECK_SEMANTIC_VALUE("0xdeadbeef", TokenType::INTEGER, 0xdeadbeef), CHECK_SEMANTIC_VALUE("0123", TokenType::INTEGER, 0123),