Skip to content

Commit

Permalink
MetaServer support general kv storage (#243)
Browse files Browse the repository at this point in the history
* meta server general KV storage

* enhance meta kv storage

* address dangleptr's comment
  • Loading branch information
darionyaphet authored Apr 17, 2019
1 parent ef7f604 commit 7e66387
Show file tree
Hide file tree
Showing 51 changed files with 1,136 additions and 133 deletions.
4 changes: 2 additions & 2 deletions src/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

/**
* Status is modeled on the one from levelDB, beyond that,
* this one adds support on move semantics and formated error messages.
* this one adds support on move semantics and formatted error messages.
*
* Status is as cheap as raw pointers in the successfull case,
* Status is as cheap as raw pointers in the successful case,
* without any heap memory allocations.
*/

Expand Down
5 changes: 4 additions & 1 deletion src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ int main(int argc, char *argv[]) {
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
if (FLAGS_num_netio_threads != 0) {
if (FLAGS_num_netio_threads > 0) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
} else {
LOG(WARNING) << "Number netio threads should be greater than zero";
return EXIT_FAILURE;
}

// Setup the signal handlers
Expand Down
8 changes: 7 additions & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ static Status setupSignalHandler();

int main(int argc, char *argv[]) {
folly::init(&argc, &argv, true);
if (FLAGS_data_path.empty()) {
LOG(ERROR) << "Meta Data Path should not empty";
return EXIT_FAILURE;
}

if (FLAGS_daemonize) {
google::SetStderrLogging(google::FATAL);
} else {
Expand Down Expand Up @@ -86,8 +91,9 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Failed to start web service: " << status;
return EXIT_FAILURE;
}
LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port
<< ", dataPath " << FLAGS_data_path;

LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port;
auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
CHECK(result.ok()) << result.status();
uint32_t localIP;
Expand Down
4 changes: 1 addition & 3 deletions src/graph/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ add_executable(
SessionManagerTest.cpp
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>

$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
Expand Down Expand Up @@ -44,7 +43,6 @@ add_executable(
DefineSchemaTest.cpp
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>

$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:client_cpp_obj>
$<TARGET_OBJECTS:base_obj>
Expand Down Expand Up @@ -87,8 +85,8 @@ add_executable(
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:network_obj>
Expand Down
84 changes: 81 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ enum ErrorCode {

// Operation Failure
E_NO_HOSTS = -21,
E_SPACE_EXISTED = -22,
E_EXISTED = -22,
E_NOT_FOUND = -23,
E_TAG_EXISTED = -24,

// KV Failure
E_STORE_FAILURE = -31,
E_STORE_SEGMENT_ILLEGAL = -32,
E_KEY_NOT_FOUND = -33,

E_UNKNOWN = -99,
} (cpp.enum_strict)
Expand All @@ -40,6 +44,11 @@ struct IdName {
2: string name,
}

struct Pair {
1: string key,
2: string value,
}

struct TagItem {
1: common.TagID tag_id,
2: string tag_name,
Expand Down Expand Up @@ -116,7 +125,8 @@ struct GetTagReq {
}

struct GetTagResp {
1: common.Schema schema,
1: ErrorCode code,
2: common.Schema schema,
}

// Edge related operations.
Expand Down Expand Up @@ -183,6 +193,67 @@ struct GetPartsAllocResp {
3: map<common.PartitionID, list<common.HostAddr>>(cpp.template = "std::unordered_map") parts,
}

struct MultiPutReq {
// segment is used to avoid conflict with system data.
// it should be comprised of numbers and letters.
1: string segment,
2: list<Pair> pairs,
}

struct MultiPutResp {
1: ErrorCode code,
}

struct GetReq {
1: string segment,
2: string key,
}

struct GetResp {
1: ErrorCode code,
2: string value,
}

struct MultiGetReq {
1: string segment,
2: list<string> keys,
}

struct MultiGetResp {
1: ErrorCode code,
2: list<string> values,
}

struct RemoveReq {
1: string segment,
2: string key,
}

struct RemoveResp {
1: ErrorCode code,
}

struct RemoveRangeReq {
1: string segment,
2: string start,
3: string end,
}

struct RemoveRangeResp {
1: ErrorCode code,
}

struct ScanReq {
1: string segment,
2: string start,
3: string end,
}

struct ScanResp {
1: ErrorCode code,
2: list<string> values,
}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp dropSpace(1: DropSpaceReq req);
Expand All @@ -204,5 +275,12 @@ service MetaService {
ListHostsResp listHosts(1: ListHostsReq req);

GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req);

MultiPutResp multiPut(1: MultiPutReq req);
GetResp get(1: GetReq req);
MultiGetResp multiGet(1: MultiGetReq req);
RemoveResp remove(1: RemoveReq req);
RemoveRangeResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);
}

1 change: 1 addition & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ enum ResultCode {
ERR_SPACE_NOT_FOUND = -4,
ERR_LEADER_CHANAGED = -5,
ERR_INVALID_ARGUMENT = -6,
ERR_IO_ERROR = -7,
};

#define KV_DATA_PATH_FORMAT(path, spaceId) \
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ class KVEngine {
virtual ResultCode get(const std::string& key,
std::string* value) = 0;

virtual ResultCode multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) = 0;

virtual ResultCode put(std::string key,
std::string value) = 0;

virtual ResultCode multiPut(std::vector<KV> keyValues) = 0;

/**
* Get all results in range [start, end)
* */
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value) = 0;

virtual ResultCode multiGet(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) = 0;
/**
* Get all results in range [start, end)
* */
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ ResultCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId,
return engine->get(key, value);
}

ResultCode NebulaStore::multiGet(GraphSpaceID spaceId, PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) {
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->multiGet(keys, values);
}

ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId,
const std::string& start,
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class NebulaStore : public KVStore, public Handler {
const std::string& key,
std::string* value) override;

ResultCode multiGet(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) override;

/**
* Get all results in range [start, end)
* */
Expand Down
20 changes: 19 additions & 1 deletion src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
RocksEngine::~RocksEngine() {
}


ResultCode RocksEngine::get(const std::string& key, std::string* value) {
rocksdb::ReadOptions options;
rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value);
Expand All @@ -62,6 +61,25 @@ ResultCode RocksEngine::get(const std::string& key, std::string* value) {
return ResultCode::ERR_UNKNOWN;
}

ResultCode RocksEngine::multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) {
rocksdb::ReadOptions options;
std::vector<rocksdb::Slice> slices;
for (unsigned int index = 0 ; index < keys.size() ; index++) {
slices.emplace_back(keys[index]);
}

std::vector<rocksdb::Status> status = db_->MultiGet(options, slices, values);
auto code = std::all_of(status.begin(), status.end(),
[](rocksdb::Status s) {
return s.ok();
});
if (code) {
return ResultCode::SUCCEEDED;
} else {
return ResultCode::ERR_UNKNOWN;
}
}

ResultCode RocksEngine::put(std::string key, std::string value) {
rocksdb::WriteOptions options;
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class RocksEngine : public KVEngine {
ResultCode get(const std::string& key,
std::string* value) override;

ResultCode multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) override;

ResultCode put(std::string key,
std::string value) override;

Expand Down
16 changes: 14 additions & 2 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ add_dependencies(schema_obj base_obj)
add_library(
meta_service_handler OBJECT
MetaServiceHandler.cpp
MetaUtils.cpp
MetaServiceUtils.cpp
MetaHttpHandler.cpp
processors/AddHostsProcessor.cpp
processors/ListHostsProcessor.cpp
Expand All @@ -24,12 +24,19 @@ add_library(
processors/GetTagProcessor.cpp
processors/ListTagsProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/GetProcessor.cpp
processors/MultiGetProcessor.cpp
processors/MultiPutProcessor.cpp
processors/RemoveProcessor.cpp
processors/RemoveRangeProcessor.cpp
processors/ScanProcessor.cpp
)

add_dependencies(
meta_service_handler
base_obj
meta_thrift_obj
common_thrift_obj
kvstore_obj
)

Expand All @@ -38,6 +45,11 @@ add_library(
client/MetaClient.cpp
)

add_dependencies(meta_client base_obj meta_thrift_obj)
add_dependencies(
meta_client
base_obj
meta_thrift_obj
common_thrift_obj
)

add_subdirectory(test)
Loading

0 comments on commit 7e66387

Please sign in to comment.