Skip to content

Commit

Permalink
Adjust meta client ctor && implement heartbeat logic inside. (vesoft-…
Browse files Browse the repository at this point in the history
…inc#383)

* Adjust meta client ctor  && implement heartbeat logic inside.

* Address laura-ding's comments

* Fix conflicts
  • Loading branch information
dangleptr authored and laura-ding committed May 20, 2019
1 parent 3c27c15 commit 3728996
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 25 deletions.
11 changes: 5 additions & 6 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ int main(int argc, char *argv[]) {
localIP = std::move(result).value();
}

if (FLAGS_num_netio_threads <= 0) {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->getIOThreadPool()->setNumThreads(FLAGS_num_netio_threads);
auto interface = std::make_shared<GraphService>(gServer->getIOThreadPool());

gServer->setInterface(std::move(interface));
Expand All @@ -127,12 +132,6 @@ int main(int argc, char *argv[]) {
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
if (FLAGS_num_netio_threads > 0) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
} else {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}

// Setup the signal handlers
status = setupSignalHandler();
Expand Down
8 changes: 7 additions & 1 deletion src/executor/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "graph/ExecutionPlan.h"
#include "storage/client/StorageClient.h"

DECLARE_string(meta_server_addrs);

namespace nebula {
namespace graph {

Expand All @@ -22,7 +24,11 @@ ExecutionEngine::~ExecutionEngine() {


Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor) {
metaClient_ = std::make_unique<meta::MetaClient>();
auto addrs = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!addrs.ok()) {
return addrs.status();
}
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()));
metaClient_->init();

schemaManager_ = meta::SchemaManager::create();
Expand Down
2 changes: 2 additions & 0 deletions src/executor/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ DEFINE_bool(redirect_stdout, true, "Whether to redirect stdout and stderr to sep
DEFINE_string(stdout_log_file, "graphd-stdout.log", "Destination filename of stdout");
DEFINE_string(stderr_log_file, "graphd-stderr.log", "Destination filename of stderr");
DEFINE_bool(daemonize, true, "Whether run as a daemon process");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
1 change: 1 addition & 0 deletions src/executor/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DECLARE_bool(redirect_stdout);
DECLARE_string(stdout_log_file);
DECLARE_string(stderr_log_file);
DECLARE_bool(daemonize);
DECLARE_string(meta_server_addrs);


#endif // GRAPH_GRAPHFLAGS_H_
30 changes: 15 additions & 15 deletions src/executor/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "graph/test/TestEnv.h"
#include "graph/test/TestBase.h"

DECLARE_int32(load_data_interval_second);
DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -58,7 +58,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "USE default_space";
Expand All @@ -70,10 +70,10 @@ TEST_F(SchemaTest, metaCommunication) {
std::string query = "CREATE TAG person(name string, email_addr string, "
"age int, gender string, row_timestamp timestamp)";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE TAG person";
Expand Down Expand Up @@ -101,7 +101,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE TAG account";
Expand All @@ -128,7 +128,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE TAG account";
Expand All @@ -153,7 +153,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE EDGE buy";
Expand All @@ -170,7 +170,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE EDGE education";
Expand All @@ -186,7 +186,7 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "SHOW EDGES";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<uniform_tuple_t<std::string, 1>> expected{
{"buy"},
Expand All @@ -210,7 +210,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_NE(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE EDGE education";
Expand All @@ -228,7 +228,7 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "USE my_space";
Expand All @@ -240,10 +240,10 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "CREATE TAG animal(name string, kind string)";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
{
cpp2::ExecutionResponse resp;
std::string query = "DESCRIBE TAG animal";
Expand All @@ -265,7 +265,7 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "SHOW TAGS";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<uniform_tuple_t<std::string, 1>> expected{
{"animal"},
Expand All @@ -277,7 +277,7 @@ TEST_F(SchemaTest, metaCommunication) {
cpp2::ExecutionResponse resp;
std::string query = "REMOVE TAG person ";
auto code = client->execute(query, resp);
sleep(FLAGS_load_data_interval_second + 1);
sleep(FLAGS_load_data_interval_secs + 1);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
{
Expand Down
6 changes: 3 additions & 3 deletions src/executor/test/TestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "base/Base.h"
#include "graph/test/TestEnv.h"

DECLARE_int32(load_data_interval_second);
DECLARE_int32(load_data_interval_secs);

namespace nebula {
namespace graph {
Expand All @@ -24,13 +24,13 @@ TestEnv::~TestEnv() {


void TestEnv::SetUp() {
FLAGS_load_data_interval_second = 1;
FLAGS_load_data_interval_secs = 1;
using ThriftServer = apache::thrift::ThriftServer;
server_ = std::make_unique<ThriftServer>();
server_->getIOThreadPool()->setNumThreads(1);
auto interface = std::make_shared<GraphService>(server_->getIOThreadPool());
server_->setInterface(std::move(interface));
server_->setPort(0); // Let the system choose an available port for us

auto serve = [this] {
server_->serve();
};
Expand Down

0 comments on commit 3728996

Please sign in to comment.