Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust meta client ctor && implement heartbeat logic inside. #379

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ int main(int argc, char *argv[]) {
localIP = std::move(result).value();
}

if (FLAGS_num_netio_threads <= 0) {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->getIOThreadPool()->setNumThreads(FLAGS_num_netio_threads);
auto interface = std::make_shared<GraphService>(gServer->getIOThreadPool());

gServer->setInterface(std::move(interface));
Expand All @@ -126,12 +131,6 @@ int main(int argc, char *argv[]) {
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
if (FLAGS_num_netio_threads > 0) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
} else {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}

// Setup the signal handlers
status = setupSignalHandler();
Expand Down
22 changes: 18 additions & 4 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_bool(mock_server, true, "start mock server");
DEFINE_bool(daemonize, true, "Whether to run the process as a daemon");
DEFINE_string(pid_file, "pids/nebula-storaged.pid", "");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
DEFINE_int32(io_handlers, 10, "io handlers");

using nebula::Status;

Expand Down Expand Up @@ -72,7 +75,7 @@ int main(int argc, char *argv[]) {
}

if (FLAGS_data_path.empty()) {
LOG(FATAL) << "Storage Data Path should not empty";
LOG(ERROR) << "Storage Data Path should not empty";
return -1;
}
LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port
Expand All @@ -88,7 +91,15 @@ int main(int argc, char *argv[]) {
uint32_t localIP;
CHECK(NetworkUtils::ipv4ToInt(result.value(), localIP));

auto metaClient = std::make_unique<nebula::meta::MetaClient>();
if (FLAGS_meta_server_addrs.empty()) {
LOG(ERROR) << "meta_server_addrs flag should be set!";
return -1;
}
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_io_handlers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we share the same pool with the hosting service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hosting service? If it means the thrift service, yes of course. Please note line 143.

auto addrs = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
auto metaClient = std::make_unique<nebula::meta::MetaClient>(ioThreadPool,
std::move(addrs),
true);
metaClient->init();

nebula::kvstore::KVOptions options;
Expand All @@ -114,7 +125,10 @@ int main(int argc, char *argv[]) {

auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), std::move(schemaMan));
gServer = std::make_unique<apache::thrift::ThriftServer>();
CHECK(!!gServer) << "Failed to create the thrift server";
if (!!gServer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, make_unique could never return a nullptr, instead it will throw bad_alloc on memory allocation failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reminding. Good point. 👍

LOG(ERROR) << "Failed to create the thrift server";
return -1;
}

// Setup the signal handlers
status = setupSignalHandler();
Expand All @@ -126,7 +140,7 @@ int main(int argc, char *argv[]) {
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection

gServer->setIOThreadPool(ioThreadPool);
gServer->serve(); // Will wait until the server shuts down

LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped";
Expand Down
8 changes: 7 additions & 1 deletion src/graph/ExecutionEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "graph/ExecutionPlan.h"
#include "storage/client/StorageClient.h"

DECLARE_string(meta_server_addrs);

namespace nebula {
namespace graph {

Expand All @@ -22,7 +24,11 @@ ExecutionEngine::~ExecutionEngine() {


Status ExecutionEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor) {
metaClient_ = std::make_unique<meta::MetaClient>();
if (FLAGS_meta_server_addrs.empty()) {
return Status::Error("The meta_server_addrs flag should not be empty!");
}
auto addrs = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs));
metaClient_->init();

schemaManager_ = meta::SchemaManager::create();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ DEFINE_bool(redirect_stdout, true, "Whether to redirect stdout and stderr to sep
DEFINE_string(stdout_log_file, "graphd-stdout.log", "Destination filename of stdout");
DEFINE_string(stderr_log_file, "graphd-stderr.log", "Destination filename of stderr");
DEFINE_bool(daemonize, true, "Whether run as a daemon process");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
1 change: 1 addition & 0 deletions src/graph/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DECLARE_bool(redirect_stdout);
DECLARE_string(stdout_log_file);
DECLARE_string(stderr_log_file);
DECLARE_bool(daemonize);
DECLARE_string(meta_server_addrs);


#endif // GRAPH_GRAPHFLAGS_H_
2 changes: 1 addition & 1 deletion src/graph/test/TestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ void TestEnv::SetUp() {
FLAGS_load_data_interval_second = 1;
using ThriftServer = apache::thrift::ThriftServer;
server_ = std::make_unique<ThriftServer>();
server_->getIOThreadPool()->setNumThreads(1);
auto interface = std::make_shared<GraphService>(server_->getIOThreadPool());
server_->setInterface(std::move(interface));
server_->setPort(0); // Let the system choose an available port for us

auto serve = [this] {
server_->serve();
};
Expand Down
13 changes: 2 additions & 11 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,8 @@ bool MemPartManager::partExist(const HostAddr& host, GraphSpaceID spaceId, Parti

MetaServerBasedPartManager::MetaServerBasedPartManager(HostAddr host, meta::MetaClient *client)
: localHost_(std::move(host)) {
if (nullptr == client) {
LOG(INFO) << "MetaClient is nullptr, create new one";
// multi instances use one metaclient
static auto clientPtr = std::make_unique<meta::MetaClient>();
static std::once_flag flag;
std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get()));
client_ = clientPtr.get();
} else {
client_ = client;
}

client_ = client;
CHECK_NOTNULL(client_);
client_->registerListener(this);
}

Expand Down
16 changes: 2 additions & 14 deletions src/meta/SchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,14 @@

#include "base/Base.h"
#include "meta/SchemaManager.h"
#include "meta/FileBasedSchemaManager.h"
#include "meta/ServerBasedSchemaManager.h"

DECLARE_string(schema_file);
DECLARE_string(meta_server_addrs);

namespace nebula {
namespace meta {

std::unique_ptr<SchemaManager> SchemaManager::create() {
if (!FLAGS_schema_file.empty()) {
std::unique_ptr<SchemaManager> sm(new FileBasedSchemaManager());
return sm;
} else if (!FLAGS_meta_server_addrs.empty()) {
std::unique_ptr<SchemaManager> sm(new ServerBasedSchemaManager());
return sm;
} else {
std::unique_ptr<SchemaManager> sm(new AdHocSchemaManager());
return sm;
}
auto sm = std::unique_ptr<SchemaManager>(new ServerBasedSchemaManager());
return sm;
}

void AdHocSchemaManager::addTagSchema(GraphSpaceID space,
Expand Down
11 changes: 2 additions & 9 deletions src/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@ ServerBasedSchemaManager::~ServerBasedSchemaManager() {
}

void ServerBasedSchemaManager::init(MetaClient *client) {
if (nullptr == client) {
LOG(INFO) << "MetaClient is nullptr, create new one";
static auto clientPtr = std::make_unique<meta::MetaClient>();
static std::once_flag flag;
std::call_once(flag, std::bind(&meta::MetaClient::init, clientPtr.get()));
metaClient_ = clientPtr.get();
} else {
metaClient_ = client;
}
metaClient_ = client;
CHECK_NOTNULL(metaClient_);
}

std::shared_ptr<const SchemaProviderIf>
Expand Down
68 changes: 51 additions & 17 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,18 @@
#include "meta/NebulaSchemaProvider.h"

DEFINE_int32(load_data_interval_second, 2 * 60, "Load data interval, unit: second");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
DEFINE_int32(meta_client_io_threads, 3, "meta client io threads");
DEFINE_int32(heartbeat_interval_sec, 10, "Heartbeat interval, unit: second");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

secs or seconds? Maybe we should unify them, e.g. the above line.


namespace nebula {
namespace meta {

MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs)
std::vector<HostAddr> addrs,
bool sendHeartBeat)
: ioThreadPool_(ioThreadPool)
, addrs_(std::move(addrs)) {
if (ioThreadPool_ == nullptr) {
ioThreadPool_
= std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_meta_client_io_threads);
}
if (addrs_.empty() && !FLAGS_meta_server_addrs.empty()) {
addrs_ = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
}
CHECK(!addrs_.empty());
, addrs_(std::move(addrs))
, sendHeartBeat_(sendHeartBeat) {
CHECK(ioThreadPool_ != nullptr && !addrs_.empty());
clientsMan_ = std::make_shared<thrift::ThriftClientManager<
meta::cpp2::MetaServiceAsyncClient>>();
updateHost();
Expand All @@ -44,13 +37,39 @@ MetaClient::~MetaClient() {
void MetaClient::init() {
loadDataThreadFunc();
CHECK(loadDataThread_.start());
size_t delayMS = FLAGS_load_data_interval_second * 1000 + folly::Random::rand32(900);
loadDataThread_.addTimerTask(delayMS,
FLAGS_load_data_interval_second * 1000,
&MetaClient::loadDataThreadFunc, this);
{
size_t delayMS = FLAGS_load_data_interval_second * 1000 + folly::Random::rand32(900);
LOG(INFO) << "Register timer task for load data!";
loadDataThread_.addTimerTask(delayMS,
FLAGS_load_data_interval_second * 1000,
&MetaClient::loadDataThreadFunc, this);
}
if (sendHeartBeat_) {
LOG(INFO) << "Register time task for heartbeat!";
size_t delayMS = FLAGS_heartbeat_interval_sec * 1000 + folly::Random::rand32(900);
loadDataThread_.addTimerTask(delayMS,
FLAGS_heartbeat_interval_sec * 1000,
&MetaClient::heartBeatThreadFunc, this);
}
}

void MetaClient::heartBeatThreadFunc() {
if (listener_ == nullptr) {
VLOG(1) << "Can't send heartbeat due to listener_ is nullptr!";
return;
}
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
}
}

void MetaClient::loadDataThreadFunc() {
if (ioThreadPool_->numThreads() <= 0) {
LOG(ERROR) << "The threads number in ioThreadPool should be greater than 0";
return;
}
auto ret = listSpaces().get();
if (!ret.ok()) {
LOG(ERROR) << "List space failed, status:" << ret.status();
Expand Down Expand Up @@ -824,5 +843,20 @@ SchemaVer MetaClient::getNewestEdgeVerFromCache(const GraphSpaceID& space,
return it->second;
}

folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
CHECK_NOTNULL(listener_);
auto localHost = listener_->getLocalHost();
cpp2::HBReq req;
nebula::cpp2::HostAddr thriftHost;
thriftHost.set_ip(localHost.first);
thriftHost.set_port(localHost.second);
req.set_host(std::move(thriftHost));
return getResponse(std::move(req), [] (auto client, auto request) {
return client->future_heartBeat(request);
}, [] (cpp2::HBResp&& resp) -> decltype(auto) {
Copy link
Contributor

@dutor dutor May 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI. I sincerely suggest not to use decltype(auto) if not necessary while auto suffices, because you may return a reference to local storage by accident. Consider this:

decltype(auto) get() {
    int var = 0;
    int &ref = var;
    return ref;
    // or return (var);
    // or return (var &= 0xF)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We should be careful about it.

return resp.code == cpp2::ErrorCode::SUCCEEDED;
}, true);
}

} // namespace meta
} // namespace nebula
16 changes: 11 additions & 5 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ class MetaChangedListener {

class MetaClient {
public:
explicit MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool = nullptr,
std::vector<HostAddr> addrs = {});
explicit MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
bool sendHeartBeat = false);

virtual ~MetaClient();

Expand Down Expand Up @@ -182,13 +183,17 @@ class MetaClient {
protected:
void loadDataThreadFunc();

void heartBeatThreadFunc();

bool loadSchemas(GraphSpaceID spaceId,
std::shared_ptr<SpaceInfoCache> spaceInfoCache,
SpaceTagNameIdMap &tagNameIdMap,
SpaceEdgeNameTypeMap &edgeNameTypeMap,
SpaceNewestTagVerMap &newestTagVerMap,
SpaceNewestEdgeVerMap &newestEdgeVerMap);

folly::Future<StatusOr<bool>> heartbeat();

std::unordered_map<HostAddr, std::vector<PartitionID>> reverse(const PartsAlloc& parts);

void updateHost() {
Expand Down Expand Up @@ -228,20 +233,21 @@ class MetaClient {
private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<meta::cpp2::MetaServiceAsyncClient>> clientsMan_;
std::unordered_map<GraphSpaceID, std::shared_ptr<SpaceInfoCache>> localCache_;
std::vector<HostAddr> addrs_;
// The lock used to protect active_ and leader_.
folly::RWSpinLock hostLock_;
HostAddr active_;
HostAddr leader_;
thread::GenericWorker loadDataThread_;
std::unordered_map<GraphSpaceID, std::shared_ptr<SpaceInfoCache>> localCache_;
SpaceNameIdMap spaceIndexByName_;
SpaceTagNameIdMap spaceTagIndexByName_;
SpaceEdgeNameTypeMap spaceEdgeIndexByName_;
SpaceNewestTagVerMap spaceNewestTagVerMap_;
SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_;
folly::RWSpinLock localCacheLock_;
MetaChangedListener* listener_{nullptr};
folly::RWSpinLock localCacheLock_;
MetaChangedListener* listener_{nullptr};
bool sendHeartBeat_ = false;
};
} // namespace meta
} // namespace nebula
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/HBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace meta {

class HBProcessor : public BaseProcessor<cpp2::HBResp> {
FRIEND_TEST(HBProcessorTest, HBTest);
FRIEND_TEST(MetaClientTest, HeartbeatTest);

public:
static HBProcessor* instance(kvstore::KVStore* kvstore) {
Expand Down
Loading