Skip to content

Commit

Permalink
Implement GoExecutor (vesoft-inc#245)
Browse files Browse the repository at this point in the history
* [WIP] Implement GoExecutor

* Address @laura.ding 's comments

* Addressed @dangleptr's comments

* Addressed @zlcook's comment

* Addressed @sherman-the-tank's and @dangleptr's comments

* Fixed a stupid error

* Reverted changes on ThriftClientManager

* Addressed @sherman-the-tank's comments

close vesoft-inc#176  
close vesoft-inc#177
  • Loading branch information
dutor authored Apr 10, 2019
1 parent 0f59e00 commit 48f83b0
Show file tree
Hide file tree
Showing 54 changed files with 2,553 additions and 580 deletions.
5 changes: 4 additions & 1 deletion conf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
install(FILES nebula-graphd.conf.default DESTINATION etc)
install(
FILES nebula-graphd.conf.default nebula-storaged.conf.default nebula-metad.conf.default
DESTINATION etc
)
34 changes: 19 additions & 15 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
########## basics ##########
# Whether run as a daemon process
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids/nebula-graphd.pid

########## logging ##########
# Directory to host logging files, which must already exist
# The directory to host logging files, which must already exists
--log_dir=logs
# 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0
# verbose loging level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
--v=4
# maximum seconds to buffer the log messages
# Maximum seconds to buffer the log messages
--logbufsecs=0
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=stdout.log
--stderr_log_file=stderr.log
# File to host the process id, which also resides in log_dir
--pid_file=nebula-graphd.pid

########## networking ##########
# Network device to listen on
--listen_netdev=any
# Port to listen on
--port=3699
# seconds before we close the idle connections, 0 for infinite
# To turn on SO_REUSEPORT or not
--reuse_port=false
# Backlog of the listen socket, adjust this together with net.core.somaxconn
--listen_backlog=1024
# Seconds before the idle connections are closed, 0 for never closed
--client_idle_timeout_secs=0
# seconds before we expire the idle sessions, 0 for inifnite
# Seconds before the idle sessions are expired, 0 for no expiration
--session_idle_timeout_secs=60000
# number of threads to accept incoming connections
# The number of threads to accept incoming connections
--num_accept_threads=1
# number of networking IO threads, 0 for number of physical CPU cores
# The number of networking IO threads, 0 for number of physical CPU cores
--num_netio_threads=0
# turn on SO_REUSEPORT or not
--reuse_port=false
# Backlog of the listen socket, adjust this together with net.core.somaxconn
--listen_backlog=1024
# HTTP service port
--ws_http_port=0
# HTTP2 service port
--ws_h2_portt=0
1 change: 1 addition & 0 deletions src/console/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_executable(
$<TARGET_OBJECTS:console_obj>
$<TARGET_OBJECTS:client_cpp_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:thread_obj>
Expand Down
71 changes: 56 additions & 15 deletions src/console/CliManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include "console/CliManager.h"
#include "client/cpp/GraphClient.h"

DECLARE_string(u);
DECLARE_string(p);

namespace nebula {
namespace graph {

Expand All @@ -18,6 +21,10 @@ const int32_t kMaxUsernameLen = 16;
const int32_t kMaxPasswordLen = 24;
const int32_t kMaxCommandLineLen = 1024;

CliManager::CliManager() {
::using_history();
}


bool CliManager::connect(const std::string& addr,
uint16_t port,
Expand All @@ -32,11 +39,15 @@ bool CliManager::connect(const std::string& addr,
pass[kMaxPasswordLen] = '\0';

// Make sure username is not empty
for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(user); i++) {
// Need to interactively get the username
std::cout << "Username: ";
std::cin.getline(user, kMaxUsernameLen);
user[kMaxUsernameLen] = '\0';
if (FLAGS_u.empty()) {
for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(user); i++) {
// Need to interactively get the username
std::cout << "Username: ";
std::cin.getline(user, kMaxUsernameLen);
user[kMaxUsernameLen] = '\0';
}
} else {
strcpy(user, FLAGS_u.c_str()); // NOLINT
}
if (!strlen(user)) {
std::cout << "Authentication failed: "
Expand All @@ -45,11 +56,15 @@ bool CliManager::connect(const std::string& addr,
}

// Make sure password is not empty
for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(pass); i++) {
// Need to interactively get the password
std::cout << "Password: ";
std::cin.getline(pass, kMaxPasswordLen);
pass[kMaxPasswordLen] = '\0';
if (FLAGS_p.empty()) {
for (int32_t i = 0; i < kMaxAuthInfoRetries && !strlen(pass); i++) {
// Need to interactively get the password
std::cout << "Password: ";
std::cin.getline(pass, kMaxPasswordLen);
pass[kMaxPasswordLen] = '\0';
}
} else {
strcpy(pass, FLAGS_p.c_str()); // NOLINT
}
if (!strlen(pass)) {
std::cout << "Authentication failed: "
Expand Down Expand Up @@ -85,26 +100,52 @@ void CliManager::loop() {
std::string cmd;
loadHistory();
while (true) {
if (!readLine(cmd)) {
std::string line;
if (!readLine(line, !cmd.empty())) {
break;
}
if (cmd.empty()) {
if (line.empty()) {
cmd.clear();
continue;
}

if (line.back() == '\\') {
line.resize(line.size() - 1);
if (cmd.empty()) {
cmd = line;
} else if (cmd.back() == ' ') {
cmd += line;
} else {
cmd = cmd + " " + line;
}
continue;
}

cmd += line;

if (!cmdProcessor_->process(cmd)) {
break;
}
cmd.clear();
}
saveHistory();
}


bool CliManager::readLine(std::string &line) {
bool CliManager::readLine(std::string &line, bool linebreak) {
auto ok = true;
char prompt[256];
static auto color = 0u;
::snprintf(prompt, sizeof(prompt), "\033[1;%umnebula> \033[0m", color++ % 6 + 31);
auto *input = ::readline(prompt);
::snprintf(prompt, sizeof(prompt),
"\001" // RL_PROMPT_START_IGNORE
"\033[1;%um" // color codes start
"\002" // RL_PROMPT_END_IGNORE
"nebula> " // prompt
"\001" // RL_PROMPT_START_IGNORE
"\033[0m" // restore color code
"\002", // RL_PROMPT_END_IGNORE
color++ % 6 + 31);
auto *input = ::readline(linebreak ? "": prompt);
do {
// EOF
Expand Down
4 changes: 2 additions & 2 deletions src/console/CliManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace graph {

class CliManager final {
public:
CliManager() = default;
CliManager();
~CliManager() = default;

bool connect(const std::string& addr,
Expand All @@ -27,7 +27,7 @@ class CliManager final {

void loop();

bool readLine(std::string &line);
bool readLine(std::string &line, bool linebreak = false);

void updateHistory(const char *line);

Expand Down
4 changes: 2 additions & 2 deletions src/console/CmdProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,12 @@ void CmdProcessor::processServerCmd(folly::StringPiece cmd) {
if (std::regex_search(*msg, result, range)) {
auto start = folly::to<size_t>(result[1].str());
auto end = folly::to<size_t>(result[2].str());
verbose = "syntax error near `" + std::string(&cmd[start-1], &cmd[end]) + "'";
verbose = "syntax error near `" + std::string(&cmd[start-1], end - start + 1) + "'";
} else if (std::regex_search(*msg, result, single)) {
auto start = folly::to<size_t>(result[1].str());
auto end = start + 8;
end = end > cmd.size() ? cmd.size() : end;
verbose = "syntax error near `" + std::string(&cmd[start-1], &cmd[end]) + "'";
verbose = "syntax error near `" + std::string(&cmd[start-1], end - start + 1) + "'";
}
std::cout << "[ERROR (" << static_cast<int32_t>(res)
<< ")]: " << verbose << "\n";
Expand Down
28 changes: 23 additions & 5 deletions src/console/NebulaConsole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,42 @@

#include "base/Base.h"
#include "console/CliManager.h"
#include "fs/FileUtils.h"

DEFINE_string(addr, "127.0.0.1", "Nebula daemon IP address");
DEFINE_int32(port, 34500, "Nebula daemon listening port");
DEFINE_string(username, "", "Username used to authenticate");
DEFINE_string(password, "", "Password used to authenticate");
DEFINE_int32(port, 0, "Nebula daemon listening port");
DEFINE_string(u, "", "Username used to authenticate");
DEFINE_string(p, "", "Password used to authenticate");


int main(int argc, char *argv[]) {
folly::init(&argc, &argv, true);

using nebula::graph::CliManager;
using nebula::fs::FileUtils;
if (FLAGS_port == 0) {
// If port not provided, we use the one in etc/nebula-graphd.conf
auto path = FileUtils::readLink("/proc/self/exe").value();
auto dir = FileUtils::dirname(path.c_str());
static const std::regex pattern("--port=([0-9]+)");
FileUtils::FileLineIterator iter(dir + "/../etc/nebula-graphd.conf", &pattern);
if (iter.valid()) {
auto &smatch = iter.matched();
FLAGS_port = folly::to<int>(smatch[1].str());
}
}
if (FLAGS_port == 0) {
fprintf(stderr, "--port must be specified\n");
return EXIT_FAILURE;
}

CliManager cli;
if (!cli.connect(FLAGS_addr, FLAGS_port, FLAGS_username, FLAGS_password)) {
exit(1);
if (!cli.connect(FLAGS_addr, FLAGS_port, FLAGS_u, FLAGS_p)) {
return EXIT_FAILURE;
}

cli.loop();
return EXIT_SUCCESS;
}


13 changes: 11 additions & 2 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ add_executable(
$<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj>
Expand All @@ -18,6 +21,7 @@ add_executable(
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:dataman_obj>
)
nebula_link_libraries(
nebula-graphd
Expand All @@ -26,7 +30,6 @@ nebula_link_libraries(
${THRIFT_LIBRARIES}
wangle
)
install(TARGETS nebula-graphd DESTINATION bin)


add_executable(
Expand Down Expand Up @@ -78,6 +81,7 @@ add_executable(
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ws_obj>
)
nebula_link_libraries(
Expand All @@ -88,4 +92,9 @@ nebula_link_libraries(
${THRIFT_LIBRARIES}
wangle
)
install(TARGETS nebula-metad DESTINATION bin)


install(
TARGETS nebula-graphd nebula-storaged nebula-metad
DESTINATION bin
)
26 changes: 16 additions & 10 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

if (FLAGS_daemonize) {
google::SetStderrLogging(google::FATAL);
} else {
google::SetStderrLogging(google::INFO);
}

// Setup logging
auto status = setupLogging();
if (!status.ok()) {
Expand All @@ -62,7 +68,7 @@ int main(int argc, char *argv[]) {
}

// Detect if the server has already been started
auto pidPath = FLAGS_log_dir + "/" + FLAGS_pid_file;
auto pidPath = FLAGS_pid_file;
status = ProcessUtils::isPidAvailable(pidPath);
if (!status.ok()) {
LOG(ERROR) << status;
Expand All @@ -84,13 +90,6 @@ int main(int argc, char *argv[]) {
}
}

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

LOG(INFO) << "Starting Graph HTTP Service";
nebula::WebService::registerHandler("/graph", [] {
return new nebula::graph::GraphHttpHandler();
Expand All @@ -112,10 +111,10 @@ int main(int argc, char *argv[]) {
localIP = std::move(result).value();
}

auto interface = std::make_shared<GraphService>();
gServer = std::make_unique<apache::thrift::ThriftServer>();
auto interface = std::make_shared<GraphService>(gServer->getIOThreadPool());

gServer->setInterface(interface);
gServer->setInterface(std::move(interface));
gServer->setAddress(localIP, FLAGS_port);
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(FLAGS_client_idle_timeout_secs));
Expand All @@ -131,6 +130,13 @@ int main(int argc, char *argv[]) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
}

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

FLOG_INFO("Starting nebula-graphd on %s:%d\n", localIP.c_str(), FLAGS_port);
try {
gServer->serve(); // Blocking wait until shut down via gServer->stop()
Expand Down
Loading

0 comments on commit 48f83b0

Please sign in to comment.