Skip to content

Commit

Permalink
Integrate raft into storage (#405)
Browse files Browse the repository at this point in the history
* Integrate raft into storage

All UTs pass. At the moment, all parts still running with single copy. Will enable multiple copies in the future PR

Implemented #178

* Addressed @dangleptr's comments and rebased

* Addressed @dangleptr's comments and rebased

* Addressed @laura-ding's @dangleptr's comments

Rebased as well
  • Loading branch information
sherman-the-tank authored and laura-ding committed Jun 6, 2019
1 parent de36b26 commit b0c7d64
Show file tree
Hide file tree
Showing 60 changed files with 1,769 additions and 855 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ macro(nebula_link_libraries target)
${OPENSSL_CRYPTO_LIBRARY}
${KRB5_LIBRARIES}
${COMPRESSION_LIBRARIES}
${JEMALLOC_LIB}
${LIBUNWIND_LIBRARIES}
${JEMALLOC_LIB}
dl
${GETTIME_LIB}
-pthread
Expand Down
2 changes: 1 addition & 1 deletion cmake/FindNCURSES.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ find_path(

find_path(
NCURSES_INCLUDE_DIR
NAMES ncurses/curses.h
NAMES ncurses.h curses.h ncurses/curses.h
HINTS ${NCURSES_ROOT_DIR}/include
)

Expand Down
4 changes: 0 additions & 4 deletions src/common/base/EitherOr.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

namespace nebula {

namespace { // NOLINT

using LeftType = std::true_type;
using RightType = std::false_type;

Expand All @@ -23,8 +21,6 @@ enum class State : int16_t {
RIGHT_TYPE = 2,
};

} // Anonymous namespace

static constexpr LeftType* kConstructLeft = nullptr;
static constexpr RightType* kConstructRight = nullptr;

Expand Down
1 change: 1 addition & 0 deletions src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ StatusOr<std::unordered_map<std::string, std::string>> NetworkUtils::listDeviceA
return dev2ipv4s;
}


bool NetworkUtils::getDynamicPortRange(uint16_t& low, uint16_t& high) {
FILE* pipe = popen("cat /proc/sys/net/ipv4/ip_local_port_range", "r");
if (!pipe) {
Expand Down
44 changes: 22 additions & 22 deletions src/common/test/ServerContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,39 @@ struct ServerContext {
if (thread_ != nullptr) {
thread_->join();
}
KVStore_ = nullptr;
kvStore_ = nullptr;
server_ = nullptr;
thread_ = nullptr;
VLOG(3) << "~ServerContext";
}


void mockCommon(const std::string& name,
uint16_t port,
std::shared_ptr<apache::thrift::ServerInterface> handler) {
server_ = std::make_unique<apache::thrift::ThriftServer>();
server_->setInterface(std::move(handler));
server_->setPort(port);
thread_ = std::make_unique<thread::NamedThread>(name, [this, name] {
server_->serve();
LOG(INFO) << "The " << name << " server has stopped";
});

while (!server_->getServeEventBase() ||
!server_->getServeEventBase()->isRunning()) {
usleep(10000);
}
port_ = server_->getAddress().getPort();
}


std::unique_ptr<apache::thrift::ThriftServer> server_{nullptr};
std::unique_ptr<thread::NamedThread> thread_{nullptr};
// To keep meta and storage's KVStore
std::unique_ptr<kvstore::KVStore> KVStore_{nullptr};
std::unique_ptr<kvstore::KVStore> kvStore_{nullptr};
uint16_t port_{0};
};

static void mockCommon(test::ServerContext *sc,
const std::string &name,
uint16_t port,
std::shared_ptr<apache::thrift::ServerInterface> handler) {
if (nullptr == sc) {
LOG(ERROR) << "ServerContext is nullptr";
return;
}
sc->server_ = std::make_unique<apache::thrift::ThriftServer>();
sc->server_->setInterface(std::move(handler));
sc->server_->setPort(port);
sc->thread_ = std::make_unique<thread::NamedThread>(name, [&]() {
sc->server_->serve();
LOG(INFO) << "Stop the server...";
});
while (!sc->server_->getServeEventBase() ||
!sc->server_->getServeEventBase()->isRunning()) {
}
sc->port_ = sc->server_->getAddress().getPort();
}

} // namespace test
} // namespace nebula
Expand Down
2 changes: 1 addition & 1 deletion src/common/test/TestServerContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TestServer final : public apache::thrift::ServerInterface {
TEST(ServerContext, mockCommon) {
auto sc = std::make_unique<ServerContext>();
auto handler = std::make_shared<TestServer>();
test::mockCommon(sc.get(), "test", 0, handler);
sc->mockCommon("test", 0, handler);
}

} // namespace test
Expand Down
11 changes: 8 additions & 3 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ add_executable(
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
Expand Down Expand Up @@ -77,9 +80,11 @@ add_executable(
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:raftex_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
Expand Down
24 changes: 19 additions & 5 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
#include "webservice/WebService.h"
#include "network/NetworkUtils.h"
#include "process/ProcessUtils.h"
#include "thread/GenericThreadPool.h"
#include "kvstore/PartManager.h"
#include "kvstore/NebulaStore.h"

using nebula::ProcessUtils;
using nebula::Status;
Expand All @@ -23,6 +25,8 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"the ips number equals replica number."
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DECLARE_string(part_man_type);

DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
Expand Down Expand Up @@ -89,7 +93,7 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Bad local host addr, status:" << hostAddrRet.status();
return EXIT_FAILURE;
}
auto& localHost = hostAddrRet.value();
auto& localhost = hostAddrRet.value();

auto peersRet = nebula::network::NetworkUtils::toHosts(FLAGS_peers);
if (!peersRet.ok()) {
Expand All @@ -108,22 +112,32 @@ int main(int argc, char *argv[]) {
// The meta server has only one space, one part.
partMan->addPart(0, 0, std::move(peersRet.value()));

// Generic thread pool
auto workers = std::make_shared<nebula::thread::GenericThreadPool>();
workers->start(FLAGS_num_workers);

// folly IOThreadPoolExecutor
auto ioPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);

nebula::kvstore::KVOptions options;
options.local_ = localHost;
options.dataPaths_ = {FLAGS_data_path};
options.partMan_ = std::move(partMan);
std::unique_ptr<nebula::kvstore::KVStore> kvstore(
nebula::kvstore::KVStore::instance(std::move(options)));
std::unique_ptr<nebula::kvstore::KVStore> kvstore =
std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
workers,
localhost);

auto handler = std::make_shared<nebula::meta::MetaServiceHandler>(kvstore.get());

nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localHost);
nebula::operator<<(operator<<(LOG(INFO), "The meta deamon start on "), localhost);
try {
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
gServer->setReusePort(FLAGS_reuse_port);
gServer->setIdleTimeout(std::chrono::seconds(0)); // No idle timeout on client connection
gServer->setIOThreadPool(ioPool);
gServer->serve(); // Will wait until the server shuts down
} catch (const std::exception &e) {
LOG(ERROR) << "Exception thrown: " << e.what();
Expand Down
70 changes: 54 additions & 16 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
#include "base/Base.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "network/NetworkUtils.h"
#include "thread/GenericThreadPool.h"
#include "storage/StorageServiceHandler.h"
#include "storage/StorageHttpHandler.h"
#include "kvstore/KVStore.h"
#include "kvstore/NebulaStore.h"
#include "kvstore/PartManager.h"
#include "process/ProcessUtils.h"
#include "storage/test/TestUtils.h"
Expand All @@ -21,13 +22,16 @@ DEFINE_int32(port, 44500, "Storage daemon listening port");
DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option");
DEFINE_string(data_path, "", "Root data path, multi paths should be split by comma."
"For rocksdb engine, one path one instance.");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_bool(mock_server, true, "start mock server");
DEFINE_bool(daemonize, true, "Whether to run the process as a daemon");
DEFINE_string(pid_file, "pids/nebula-storaged.pid", "File to hold the process id");
DEFINE_string(meta_server_addrs, "", "list of meta server addresses,"
"the format looks like ip1:port1, ip2:port2, ip3:port3");
DEFINE_int32(io_handlers, 10, "io handlers");
DEFINE_string(store_type, "nebula",
"Which type of KVStore to be used by the storage daemon."
" Options can be \"nebula\", \"hbase\", etc.");
DEFINE_int32(num_workers, 4, "Number of worker threads");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");

using nebula::Status;
using nebula::HostAddr;
Expand All @@ -44,6 +48,33 @@ static void signalHandler(int sig);
static Status setupSignalHandler();


std::unique_ptr<nebula::kvstore::KVStore> getStoreInstance(
HostAddr localhost,
std::vector<std::string> paths,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<nebula::thread::GenericThreadPool> workers,
nebula::meta::MetaClient* metaClient) {
nebula::kvstore::KVOptions options;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::make_unique<nebula::kvstore::MetaServerBasedPartManager>(
localhost,
metaClient);

if (FLAGS_store_type == "nebula") {
return std::make_unique<nebula::kvstore::NebulaStore>(std::move(options),
ioPool,
workers,
localhost);
} else if (FLAGS_store_type == "hbase") {
LOG(FATAL) << "HBase store has not been implemented";
} else {
LOG(FATAL) << "Unknown store type \"" << FLAGS_store_type << "\"";
}

return nullptr;
}


int main(int argc, char *argv[]) {
google::SetVersionString(nebula::versionString());
folly::init(&argc, &argv, true);
Expand Down Expand Up @@ -81,18 +112,17 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
auto result = nebula::network::NetworkUtils::getLocalIP();
if (!result.ok()) {
LOG(ERROR) << "Get localIp failed, ip " << FLAGS_local_ip
<< ", status:" << result.status();
LOG(ERROR) << "Get localIp failed, status:" << result.status();
return EXIT_FAILURE;
}
auto hostRet = nebula::network::NetworkUtils::toHostAddr(result.value(), FLAGS_port);
if (!hostRet.ok()) {
LOG(ERROR) << "Bad local host addr, status:" << hostRet.status();
return EXIT_FAILURE;
}
auto& localHost = hostRet.value();
auto& localhost = hostRet.value();
auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) {
LOG(ERROR) << "Can't get metaServer address, status:" << metaAddrsRet.status()
Expand All @@ -110,19 +140,25 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}

auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_io_handlers);
// Generic thread pool
auto workers = std::make_shared<nebula::thread::GenericThreadPool>();
workers->start(FLAGS_num_workers);

// folly IOThreadPoolExecutor
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(FLAGS_num_io_threads);

// Meta client
auto metaClient = std::make_unique<nebula::meta::MetaClient>(ioThreadPool,
std::move(metaAddrsRet.value()),
true);
metaClient->init();

nebula::kvstore::KVOptions options;
options.local_ = localHost;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::make_unique<nebula::kvstore::MetaServerBasedPartManager>(
options.local_, metaClient.get());
std::unique_ptr<nebula::kvstore::KVStore> kvstore(
nebula::kvstore::KVStore::instance(std::move(options)));
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
ioThreadPool,
workers,
metaClient.get());

auto schemaMan = nebula::meta::SchemaManager::create();
schemaMan->init(metaClient.get());

Expand All @@ -136,16 +172,18 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Failed to start web service: " << status;
return EXIT_FAILURE;
}

// Setup the signal handlers
status = setupSignalHandler();
if (!status.ok()) {
LOG(ERROR) << status;
return EXIT_FAILURE;
}

// Thrift server
auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), std::move(schemaMan));
try {
nebula::operator<<(operator<<(LOG(INFO), "The storage deamon start on "), localHost);
nebula::operator<<(operator<<(LOG(INFO), "The storage deamon start on "), localhost);
gServer = std::make_unique<apache::thrift::ThriftServer>();
gServer->setInterface(std::move(handler));
gServer->setPort(FLAGS_port);
Expand Down
Loading

0 comments on commit b0c7d64

Please sign in to comment.