Skip to content

Commit

Permalink
Merge branch 'master' into issues-73
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Jan 14, 2019
2 parents f03bc2e + 015a731 commit e8201ae
Show file tree
Hide file tree
Showing 62 changed files with 3,337 additions and 379 deletions.
2 changes: 1 addition & 1 deletion src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ifaddrs.h>
#include <arpa/inet.h>


namespace nebula {
namespace network {

Expand Down Expand Up @@ -82,7 +83,6 @@ StatusOr<std::unordered_map<std::string, std::string>> NetworkUtils::listDeviceA
return dev2ipv4s;
}


bool NetworkUtils::getDynamicPortRange(uint16_t& low, uint16_t& high) {
FILE* pipe = popen("cat /proc/sys/net/ipv4/ip_local_port_range", "r");
if (!pipe) {
Expand Down
1 change: 0 additions & 1 deletion src/common/network/NetworkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class NetworkUtils final {
static StatusOr<std::vector<std::string>> listIPv4s();
// List out all network devices and its cooresponding Ipv4 address.
static StatusOr<std::unordered_map<std::string, std::string>> listDeviceAndIPv4s();

// Get the local dynamic port range [low, high], only works for IPv4
static bool getDynamicPortRange(uint16_t& low, uint16_t& high);
// Get all ports that are currently in use
Expand Down
1 change: 0 additions & 1 deletion src/common/network/test/NetworkUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ TEST(NetworkUtils, listDeviceAndIPv4s) {
ASSERT_NE(result.value().end(), result.value().find("lo"));
}


TEST(NetworkUtils, intIPv4Conversion) {
uint32_t ip;
ASSERT_TRUE(NetworkUtils::ipv4ToInt("127.0.0.1", ip));
Expand Down
35 changes: 35 additions & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ add_executable(
$<TARGET_OBJECTS:storage_service_handler>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:meta_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:process_obj>
)
target_link_libraries(
storaged
Expand All @@ -68,3 +72,34 @@ target_link_libraries(
)


add_executable(
metad
MetaDaemon.cpp
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
)
target_link_libraries(
metad
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
wangle
folly
boost_system
boost_context
${OPENSSL_LIBRARIES}
${KRB5_LIBRARIES}
glog
gflags
event
${COMPRESSION_LIBRARIES}
resolv
double-conversion
dl
jemalloc
-pthread
)
32 changes: 32 additions & 0 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

#include "base/Base.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "meta/MetaServiceHandler.h"

DEFINE_int32(port, 45500, "Meta daemon listening port");


int main(int argc, char *argv[]) {
folly::init(&argc, &argv, true);

using namespace nebula::meta;

LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port;

auto handler = std::make_shared<MetaServiceHandler>();
auto server = std::make_shared<apache::thrift::ThriftServer>();
CHECK(!!server) << "Failed to create the thrift server";

server->setInterface(handler);
server->setPort(FLAGS_port);

server->serve(); // Will wait until the server shuts down

LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped";
}

47 changes: 45 additions & 2 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,62 @@

#include "base/Base.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "network/NetworkUtils.h"
#include "storage/StorageServiceHandler.h"
#include "kvstore/include/KVStore.h"
#include "meta/SchemaManager.h"

DEFINE_int32(port, 44500, "Storage daemon listening port");
DEFINE_string(data_path, "", "Root data path, multi paths should be split by comma."
"For rocksdb engine, one path one instance.");
DEFINE_string(local_ip, "", "Local ip");

// Get local IPv4 address. You could specify it by set FLAGS_local_ip, otherwise
// it will use the first ip exclude "127.0.0.1"
namespace nebula {

StatusOr<std::string> getLocalIP() {
if (!FLAGS_local_ip.empty()) {
return FLAGS_local_ip;
}
auto result = network::NetworkUtils::listDeviceAndIPv4s();
if (!result.ok()) {
return std::move(result).status();
}
for (auto& deviceIP : result.value()) {
if (deviceIP.second != "127.0.0.1") {
return deviceIP.first;
}
}
return Status::Error("No IPv4 address found!");
}

} // namespace nebula

int main(int argc, char *argv[]) {
folly::init(&argc, &argv, true);

using namespace nebula;
using namespace nebula::storage;

LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port;
LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port
<< ", dataPath " << FLAGS_data_path;

std::vector<std::string> paths;
folly::split(",", FLAGS_data_path, paths, true);
std::transform(paths.begin(), paths.end(), paths.begin(), [](auto& p) {
return folly::trimWhitespace(p).str();
});
auto result = getLocalIP();
CHECK(result.ok()) << result.status();
uint32_t localIP;
CHECK(network::NetworkUtils::ipv4ToInt(result.value(), localIP));

std::unique_ptr<kvstore::KVStore> kvstore(
kvstore::KVStore::instance(HostAddr(localIP, FLAGS_port), std::move(paths)));
std::unique_ptr<meta::SchemaManager> schemaMan(meta::SchemaManager::instance());

auto handler = std::make_shared<StorageServiceHandler>();
auto handler = std::make_shared<StorageServiceHandler>(kvstore.get(), schemaMan.get());
auto server = std::make_shared<apache::thrift::ThriftServer>();
CHECK(!!server) << "Failed to create the thrift server";

Expand Down
2 changes: 1 addition & 1 deletion src/dataman/ResultSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bool ResultSchemaProvider::ResultSchemaField::isValid() const {
* ResultSchemaProvider
*
**********************************/
ResultSchemaProvider::ResultSchemaProvider(cpp2::Schema&& schema)
ResultSchemaProvider::ResultSchemaProvider(cpp2::Schema schema)
: columns_(std::move(schema.get_columns())) {
for (auto i = 0UL; i< columns_.size(); i++) {
const std::string& name = columns_[i].get_name();
Expand Down
2 changes: 1 addition & 1 deletion src/dataman/ResultSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ResultSchemaProvider : public SchemaProviderIf {


public:
explicit ResultSchemaProvider(storage::cpp2::Schema&&);
explicit ResultSchemaProvider(storage::cpp2::Schema);
virtual ~ResultSchemaProvider() = default;

int32_t getLatestVer() const noexcept override;
Expand Down
1 change: 1 addition & 0 deletions src/dataman/RowSetWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RowSetWriter::RowSetWriter(const SchemaProviderIf* schema,


void RowSetWriter::writeRowLength(int64_t len) {
VLOG(3) << "Write row length " << len;
uint8_t buf[10];
size_t lenBytes = folly::encodeVarint(len, buf);
DCHECK_GT(lenBytes, 0UL);
Expand Down
1 change: 0 additions & 1 deletion src/dataman/RowWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ RowWriter& RowWriter::operator<<(const std::string& v) noexcept {
return operator<<(folly::StringPiece(v));
}


RowWriter& RowWriter::operator<<(folly::StringPiece v) noexcept {
RW_GET_COLUMN_TYPE(STRING)

Expand Down
24 changes: 24 additions & 0 deletions src/interface/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ add_custom_command(
DEPENDS storage.thrift
)

add_custom_command(
OUTPUT
gen-cpp2/MetaService.cpp
gen-cpp2/MetaServiceAsyncClient.cpp
gen-cpp2/MetaService_processmap_binary.cpp
gen-cpp2/MetaService_processmap_compact.cpp
gen-cpp2/meta_constants.cpp
gen-cpp2/meta_data.cpp
gen-cpp2/meta_types.cpp
COMMAND "${CMAKE_HOME_DIRECTORY}/third-party/fbthrift/_install/bin/thrift1" "--allow-neg-enum-vals" "--templates" "${CMAKE_HOME_DIRECTORY}/third-party/fbthrift/_install/include/thrift/templates" "--gen" "mstch_cpp2:include_prefix=\"interface\",process_in_event_base,stack_arguments" "--gen" "java:hashcode" "--gen" "go" "-o" "." "./meta.thrift"
DEPENDS meta.thrift
)

add_custom_target(
clean-interface
Expand Down Expand Up @@ -83,3 +95,15 @@ add_library(
gen-cpp2/storage_types.cpp
)
add_dependencies(storage_thrift_obj tgt_fbthrift)

add_library(
meta_thrift_obj OBJECT
gen-cpp2/MetaService.cpp
gen-cpp2/MetaServiceAsyncClient.cpp
gen-cpp2/MetaService_processmap_binary.cpp
gen-cpp2/MetaService_processmap_compact.cpp
gen-cpp2/meta_constants.cpp
gen-cpp2/meta_data.cpp
gen-cpp2/meta_types.cpp
)
add_dependencies(meta_thrift_obj tgt_fbthrift)
79 changes: 79 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/

namespace cpp nebula.meta
namespace java nebula.meta
namespace go nebula.meta

cpp_include "base/ThriftTypes.h"

typedef i32 (cpp.type = "nebula::IPv4") IPv4
typedef i32 (cpp.type = "nebula::Port") Port

enum ErrorCode {
SUCCEEDED = 0,

// RPC Failure
E_DISCONNECTED = -1,
E_FAIL_TO_CONNECT = -2,
E_RPC_FAILURE = -3,

E_LEADER_CHANAGED = -11,

// Operation Failure
E_NODE_HAS_EXISTED = -21,
E_NODE_NOT_EXISTED = -22,
} (cpp.enum_strict)

struct HostAddr {
1: IPv4 ip,
2: Port port,
}

struct ExecResponse {
1: ErrorCode ret,
// Valid if ret equals E_LEADER_CHANAGED.
2: HostAddr leader,
}

struct CreateNodeRequest {
1: string path,
2: string value,
}

struct SetNodeRequest {
1: string path,
2: string value,
}

struct GetNodeRequest {
1: string path,
}

struct GetNodeResponse {
1: ErrorCode ret,
2: HostAddr leader,
3: string value,
4: i64 last_updated_time,
}

struct ListChildrenRequest {
1: string path,
}

struct ListChildrenResponse {
1: ErrorCode ret,
2: HostAddr leader,
3: list<string> children,
}

service MetaService {
ExecResponse createNode(1: CreateNodeRequest req);
ExecResponse setNode(1: SetNodeRequest req);
GetNodeResponse getNode(1: GetNodeRequest req);
ListChildrenResponse listChildren(1: ListChildrenRequest req);
}

Loading

0 comments on commit e8201ae

Please sign in to comment.