Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust meta client ctor && implement heartbeat logic inside. #383

Merged
merged 3 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,36 +204,42 @@ std::string NetworkUtils::intToIPv4(uint32_t ip) {
return buf;
}


HostAddr NetworkUtils::toHostAddr(const folly::StringPiece ip, int32_t port) {
StatusOr<HostAddr> NetworkUtils::toHostAddr(folly::StringPiece ip, int32_t port) {
uint32_t ipV4;
CHECK(ipv4ToInt(ip.toString(), ipV4));
if (!ipv4ToInt(ip.toString(), ipV4)) {
return Status::Error("Bad ip format:%s", ip.start());
}
return std::make_pair(ipV4, port);
}


HostAddr NetworkUtils::toHostAddr(const folly::StringPiece ipPort) {
StatusOr<HostAddr> NetworkUtils::toHostAddr(folly::StringPiece ipPort) {
auto pos = ipPort.find(':');
CHECK_NE(pos, folly::StringPiece::npos);
if (pos == folly::StringPiece::npos) {
return Status::Error("Bad peer format: %s", ipPort.start());
}

int32_t port;
try {
port = folly::to<int32_t>(ipPort.subpiece(pos + 1));
} catch (const std::exception& ex) {
LOG(FATAL) << "Bad ipPort: " << ex.what();
return Status::Error("Bad port number, error: %s", ex.what());
}

return toHostAddr(ipPort.subpiece(0, pos), port);
}

std::vector<HostAddr> NetworkUtils::toHosts(const std::string& peersStr) {
StatusOr<std::vector<HostAddr>> NetworkUtils::toHosts(const std::string& peersStr) {
std::vector<HostAddr> hosts;
std::vector<std::string> peers;
folly::split(",", peersStr, peers, true);
hosts.resize(peers.size());
std::transform(peers.begin(), peers.end(), hosts.begin(), [](auto& p) {
return network::NetworkUtils::toHostAddr(folly::trimWhitespace(p));
});
hosts.reserve(peers.size());
for (auto& peerStr : peers) {
auto hostAddr = network::NetworkUtils::toHostAddr(folly::trimWhitespace(peerStr));
if (!hostAddr.ok()) {
return hostAddr.status();
}
hosts.emplace_back(hostAddr.value());
}
return hosts;
}

Expand Down
7 changes: 4 additions & 3 deletions src/common/network/NetworkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class NetworkUtils final {
static uint16_t getAvailablePort();

// Convert the given IP (must be in the form of "xx.xx.xx.xx") and Port to a HostAddr
static HostAddr toHostAddr(const folly::StringPiece ip, int32_t port);
static StatusOr<HostAddr> toHostAddr(folly::StringPiece ip, int32_t port);
// Convert the given IP/Port (must be in the form of "xx.xx.xx.xx:pp") to a HostAddr
static HostAddr toHostAddr(const folly::StringPiece ipPort);
static StatusOr<HostAddr> toHostAddr(folly::StringPiece ipPort);
// Retrieve the string-form IP from the given HostAddr
static std::string ipFromHostAddr(const HostAddr& host);
// Retrieve the port number from the given HostAddr
Expand All @@ -57,7 +57,8 @@ class NetworkUtils final {

// Convert peers str which is a list of ipPort joined with comma into HostAddr list.
// (Peers str format example: 192.168.1.1:10001, 192.168.1.2:10001)
static std::vector<HostAddr> toHosts(const std::string& peersStr);
// Return Status::Error if peersStr is invalid.
static StatusOr<std::vector<HostAddr>> toHosts(const std::string& peersStr);

private:
};
Expand Down
11 changes: 5 additions & 6 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ int main(int argc, char *argv[]) {
localIP = std::move(result).value();
}

if (FLAGS_num_netio_threads <= 0) {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->getIOThreadPool()->setNumThreads(FLAGS_num_netio_threads);
auto interface = std::make_shared<GraphService>(gServer->getIOThreadPool());

gServer->setInterface(std::move(interface));
Expand All @@ -127,12 +132,6 @@ int main(int argc, char *argv[]) {
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
if (FLAGS_num_netio_threads > 0) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
} else {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}

// Setup the signal handlers
status = setupSignalHandler();
Expand Down
67 changes: 30 additions & 37 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,6 @@ DECLARE_string(part_man_type);
DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
DEFINE_bool(daemonize, true, "Whether run as a daemon process");

namespace nebula {

std::vector<HostAddr> toHosts(const std::string& peersStr) {
std::vector<HostAddr> hosts;
std::vector<std::string> peers;
folly::split(",", peersStr, peers, true);
std::transform(peers.begin(), peers.end(), hosts.begin(), [](auto& p) {
return network::NetworkUtils::toHostAddr(folly::trimWhitespace(p));
});
return hosts;
}

} // namespace nebula


static std::unique_ptr<apache::thrift::ThriftServer> gServer;

static void signalHandler(int sig);
Expand Down Expand Up @@ -91,50 +76,58 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Failed to start web service: " << status;
return EXIT_FAILURE;
}
LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port
<< ", dataPath " << FLAGS_data_path;

auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
CHECK(result.ok()) << result.status();
uint32_t localIP;
CHECK(nebula::network::NetworkUtils::ipv4ToInt(result.value(), localIP));
if (!result.ok()) {
LOG(ERROR) << "Get local ip failed! status:" << result.status();
return EXIT_FAILURE;
}
auto hostAddrRet = nebula::network::NetworkUtils::toHostAddr(result.value(), FLAGS_port);
if (!hostAddrRet.ok()) {
LOG(ERROR) << "Bad local host addr, status:" << hostAddrRet.status();
return EXIT_FAILURE;
}
auto& localHost = hostAddrRet.value();

auto peersRet = nebula::network::NetworkUtils::toHosts(FLAGS_peers);
if (!peersRet.ok()) {
LOG(ERROR) << "Can't get peers address, status:" << peersRet.status();
return EXIT_FAILURE;
}
// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

auto partMan
= std::make_unique<nebula::kvstore::MemPartManager>();
// The meta server has only one space, one part.
partMan->addPart(0, 0, nebula::toHosts(FLAGS_peers));
partMan->addPart(0, 0, std::move(peersRet.value()));

nebula::kvstore::KVOptions options;
options.local_ = nebula::HostAddr(localIP, FLAGS_port);
options.local_ = localHost;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
std::unique_ptr<nebula::kvstore::KVStore> kvstore(
nebula::kvstore::KVStore::instance(std::move(options)));

auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore.get());

gServer = std::make_unique<apache::thrift::ThriftServer>();
CHECK(!!gServer) << "Failed to create the thrift server";

gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localHost);
try {
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->serve(); // Will wait until the server shuts down
} catch (const std::exception &e) {
LOG(ERROR) << "Exception thrown: " << e.what();
return EXIT_FAILURE;
}

LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped";
LOG(INFO) << "The meta Daemon stopped";
}


Expand Down
68 changes: 47 additions & 21 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_bool(mock_server, true, "start mock server");
DEFINE_bool(daemonize, true, "Whether to run the process as a daemon");
DEFINE_string(pid_file, "pids/nebula-storaged.pid", "File to hold the process id");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
DEFINE_int32(io_handlers, 10, "io handlers");

using nebula::Status;
using nebula::HostAddr;
Expand Down Expand Up @@ -72,27 +75,47 @@ int main(int argc, char *argv[]) {
}

if (FLAGS_data_path.empty()) {
LOG(FATAL) << "Storage Data Path should not empty";
return -1;
LOG(ERROR) << "Storage Data Path should not empty";
return EXIT_FAILURE;
}

auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
if (!result.ok()) {
LOG(ERROR) << "Get localIp failed, ip " << FLAGS_local_ip
<< ", status:" << result.status();
return EXIT_FAILURE;
}
auto hostRet = nebula::network::NetworkUtils::toHostAddr(result.value(), FLAGS_port);
if (!hostRet.ok()) {
LOG(ERROR) << "Bad local host addr, status:" << hostRet.status();
return EXIT_FAILURE;
}
auto& localHost = hostRet.value();
auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) {
LOG(ERROR) << "Can't get metaServer address, status:" << metaAddrsRet.status()
<< ", FLAGS_meta_server_addrs:" << FLAGS_meta_server_addrs;
return EXIT_FAILURE;
}
LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port
<< ", dataPath " << FLAGS_data_path;

std::vector<std::string> paths;
folly::split(",", FLAGS_data_path, paths, true);
std::transform(paths.begin(), paths.end(), paths.begin(), [](auto& p) {
return folly::trimWhitespace(p).str();
});
auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
CHECK(result.ok()) << result.status();
uint32_t localIP;
CHECK(NetworkUtils::ipv4ToInt(result.value(), localIP));
if (paths.empty()) {
LOG(ERROR) << "Bad data_path format:" << FLAGS_data_path;
return EXIT_FAILURE;
}

auto metaClient = std::make_unique<nebula::meta::MetaClient>();
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_io_handlers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could share the same thread pool with the one in ThriftServer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done it. Please note line 151

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent.

auto metaClient = std::make_unique<nebula::meta::MetaClient>(ioThreadPool,
std::move(metaAddrsRet.value()),
true);
metaClient->init();

nebula::kvstore::KVOptions options;
options.local_ = HostAddr(localIP, FLAGS_port);
options.local_ = localHost;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::make_unique<nebula::kvstore::MetaServerBasedPartManager>(
options.local_, metaClient.get());
Expand All @@ -111,25 +134,28 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Failed to start web service: " << status;
return EXIT_FAILURE;
}

auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), std::move(schemaMan));
gServer = std::make_unique<apache::thrift::ThriftServer>();
CHECK(!!gServer) << "Failed to create the thrift server";

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection

gServer->serve(); // Will wait until the server shuts down
auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), std::move(schemaMan));
try {
nebula::operator<<(operator<<(LOG(INFO), "The storage deamon start on "), localHost);
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioThreadPool);
gServer->serve(); // Will wait until the server shuts down
} catch (const std::exception& e) {
LOG(ERROR) << "Start thrift server failed, error:" << e.what();
return EXIT_FAILURE;
}

LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped";
LOG(INFO) << "The storage Daemon stopped";
}


Expand Down
8 changes: 7 additions & 1 deletion src/graph/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "graph/ExecutionPlan.h"
#include "storage/client/StorageClient.h"

DECLARE_string(meta_server_addrs);

namespace nebula {
namespace graph {

Expand All @@ -22,7 +24,11 @@ ExecutionEngine::~ExecutionEngine() {


Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor) {
metaClient_ = std::make_unique<meta::MetaClient>();
auto addrs = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!addrs.ok()) {
return addrs.status();
}
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()));
metaClient_->init();

schemaManager_ = meta::SchemaManager::create();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ DEFINE_bool(redirect_stdout, true, "Whether to redirect stdout and stderr to sep
DEFINE_string(stdout_log_file, "graphd-stdout.log", "Destination filename of stdout");
DEFINE_string(stderr_log_file, "graphd-stderr.log", "Destination filename of stderr");
DEFINE_bool(daemonize, true, "Whether run as a daemon process");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
1 change: 1 addition & 0 deletions src/graph/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DECLARE_bool(redirect_stdout);
DECLARE_string(stdout_log_file);
DECLARE_string(stderr_log_file);
DECLARE_bool(daemonize);
DECLARE_string(meta_server_addrs);


#endif // GRAPH_GRAPHFLAGS_H_
Loading