Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MetaServer support general kv storage #243

Merged
merged 23 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8b57679
meta server general KV storage
darionyaphet Mar 25, 2019
01d5148
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Mar 27, 2019
cee8bf4
enhance meta kv storage
darionyaphet Mar 29, 2019
8aaa6b9
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 2, 2019
5a2aef8
fix merge modify
darionyaphet Apr 2, 2019
cc78c75
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 8, 2019
2b46470
support key check & avoid conflict
darionyaphet Apr 8, 2019
3182d8e
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 9, 2019
253b92a
fix conflict
darionyaphet Apr 9, 2019
2abd625
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 11, 2019
36500f0
address dangleptr's comment
darionyaphet Apr 11, 2019
7e1a060
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 11, 2019
051b3dc
address dangleptr's comment
darionyaphet Apr 11, 2019
be04af2
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 11, 2019
5102ad7
address dapleptr's comment
darionyaphet Apr 12, 2019
45b6749
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 12, 2019
a185891
address dapleptr's comment
darionyaphet Apr 12, 2019
200d3ed
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 12, 2019
e8e8fbf
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 15, 2019
80fcf04
fix do remove name
darionyaphet Apr 15, 2019
43d4549
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 15, 2019
083dc72
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 16, 2019
6847dd9
Merge branch 'master' into issues-183-meta_server_general_KV_storage
darionyaphet Apr 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

/**
* Status is modeled on the one from levelDB, beyond that,
* this one adds support on move semantics and formated error messages.
* this one adds support on move semantics and formatted error messages.
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
*
* Status is as cheap as raw pointers in the successfull case,
* Status is as cheap as raw pointers in the successful case,
* without any heap memory allocations.
*/

Expand Down
5 changes: 4 additions & 1 deletion src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ int main(int argc, char *argv[]) {
gServer->setNumAcceptThreads(FLAGS_num_accept_threads);
gServer->setListenBacklog(FLAGS_listen_backlog);
gServer->setThreadStackSizeMB(5);
if (FLAGS_num_netio_threads != 0) {
if (FLAGS_num_netio_threads > 0) {
gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads);
} else {
LOG(WARNING) << "Number netio threads should be greater than zero";
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
return EXIT_FAILURE;
}

// Setup the signal handlers
Expand Down
8 changes: 7 additions & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ static Status setupSignalHandler();

int main(int argc, char *argv[]) {
folly::init(&argc, &argv, true);
if (FLAGS_data_path.empty()) {
LOG(ERROR) << "Meta Data Path should not empty";
return EXIT_FAILURE;
}

if (FLAGS_daemonize) {
google::SetStderrLogging(google::FATAL);
} else {
Expand Down Expand Up @@ -86,8 +91,9 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "Failed to start web service: " << status;
return EXIT_FAILURE;
}
LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port
<< ", dataPath " << FLAGS_data_path;

LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port;
auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip);
CHECK(result.ok()) << result.status();
uint32_t localIP;
Expand Down
4 changes: 1 addition & 3 deletions src/graph/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ add_executable(
SessionManagerTest.cpp
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>

$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
Expand Down Expand Up @@ -40,7 +39,6 @@ add_executable(
DefineSchemaTest.cpp
$<TARGET_OBJECTS:meta_client>
$<TARGET_OBJECTS:meta_thrift_obj>

$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:client_cpp_obj>
$<TARGET_OBJECTS:base_obj>
Expand Down Expand Up @@ -79,8 +77,8 @@ add_executable(
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:network_obj>
Expand Down
84 changes: 81 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ enum ErrorCode {

// Operation Failure
E_NO_HOSTS = -21,
E_SPACE_EXISTED = -22,
E_EXISTED = -22,
E_NOT_FOUND = -23,
E_TAG_EXISTED = -24,

// KV Failure
E_STORE_FAILURE = -31,
E_STORE_SEGMENT_ILLEGAL = -32,
E_KEY_NOT_FOUND = -33,

E_UNKNOWN = -99,
} (cpp.enum_strict)
Expand All @@ -40,6 +44,11 @@ struct IdName {
2: string name,
}

struct Pair {
1: string key,
2: string value,
}

struct TagItem {
1: common.TagID tag_id,
2: string tag_name,
Expand Down Expand Up @@ -115,7 +124,8 @@ struct GetTagReq {
}

struct GetTagResp {
1: common.Schema schema,
1: ErrorCode code,
2: common.Schema schema,
}

// Edge related operations.
Expand Down Expand Up @@ -182,6 +192,67 @@ struct GetPartsAllocResp {
3: map<common.PartitionID, list<common.HostAddr>>(cpp.template = "std::unordered_map") parts,
}

struct MultiPutReq {
// segment is used to avoid conflict with system data.
// it should be comprised of numbers and letters.
1: string segment,
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
2: list<Pair> pairs,
}

struct MultiPutResp {
1: ErrorCode code,
}

struct GetReq {
1: string segment,
2: string key,
}

struct GetResp {
1: ErrorCode code,
2: string value,
}

struct MultiGetReq {
1: string segment,
2: list<string> keys,
}

struct MultiGetResp {
1: ErrorCode code,
2: list<string> values,
}

struct RemoveReq {
1: string segment,
2: string key,
}

struct RemoveResp {
1: ErrorCode code,
}

struct RemoveRangeReq {
1: string segment,
2: string start,
3: string end,
}

struct RemoveRangeResp {
1: ErrorCode code,
}

struct ScanReq {
1: string segment,
2: string start,
3: string end,
}

struct ScanResp {
1: ErrorCode code,
2: list<string> values,
}

service MetaService {
ExecResp createSpace(1: CreateSpaceReq req);
ExecResp deleteSpace(1: DeleteSpaceReq req);
Expand All @@ -203,5 +274,12 @@ service MetaService {
ListHostsResp listHosts(1: ListHostsReq req);

GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req);

MultiPutResp multiPut(1: MultiPutReq req);
GetResp get(1: GetReq req);
MultiGetResp multiGet(1: MultiGetReq req);
RemoveResp remove(1: RemoveReq req);
RemoveRangeResp removeRange(1: RemoveRangeReq req);
ScanResp scan(1: ScanReq req);
}

1 change: 1 addition & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ enum ResultCode {
ERR_SPACE_NOT_FOUND = -4,
ERR_LEADER_CHANAGED = -5,
ERR_INVALID_ARGUMENT = -6,
ERR_IO_ERROR = -7,
};

#define KV_DATA_PATH_FORMAT(path, spaceId) \
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ class KVEngine {
virtual ResultCode get(const std::string& key,
std::string* value) = 0;

virtual ResultCode multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) = 0;

virtual ResultCode put(std::string key,
std::string value) = 0;

virtual ResultCode multiPut(std::vector<KV> keyValues) = 0;

/**
* Get all results in range [start, end)
* */
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value) = 0;

virtual ResultCode multiGet(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) = 0;
/**
* Get all results in range [start, end)
* */
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ ResultCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId,
return engine->get(key, value);
}

ResultCode NebulaStore::multiGet(GraphSpaceID spaceId, PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) {
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->multiGet(keys, values);
}

ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId,
const std::string& start,
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class NebulaStore : public KVStore, public Handler {
const std::string& key,
std::string* value) override;

ResultCode multiGet(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<std::string>& keys,
std::vector<std::string>* values) override;

/**
* Get all results in range [start, end)
* */
Expand Down
20 changes: 19 additions & 1 deletion src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
RocksEngine::~RocksEngine() {
}


ResultCode RocksEngine::get(const std::string& key, std::string* value) {
rocksdb::ReadOptions options;
rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value);
Expand All @@ -62,6 +61,25 @@ ResultCode RocksEngine::get(const std::string& key, std::string* value) {
return ResultCode::ERR_UNKNOWN;
}

ResultCode RocksEngine::multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) {
rocksdb::ReadOptions options;
std::vector<rocksdb::Slice> slices;
for (unsigned int index = 0 ; index < keys.size() ; index++) {
slices.emplace_back(keys[index]);
}

std::vector<rocksdb::Status> status = db_->MultiGet(options, slices, values);
auto code = std::all_of(status.begin(), status.end(),
[](rocksdb::Status s) {
return s.ok();
});
if (code) {
return ResultCode::SUCCEEDED;
} else {
return ResultCode::ERR_UNKNOWN;
}
}

ResultCode RocksEngine::put(std::string key, std::string value) {
rocksdb::WriteOptions options;
Expand Down
3 changes: 3 additions & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class RocksEngine : public KVEngine {
ResultCode get(const std::string& key,
std::string* value) override;

ResultCode multiGet(const std::vector<std::string>& keys,
std::vector<std::string>* values) override;

ResultCode put(std::string key,
std::string value) override;

Expand Down
16 changes: 14 additions & 2 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ add_dependencies(schema_obj base_obj)
add_library(
meta_service_handler OBJECT
MetaServiceHandler.cpp
MetaUtils.cpp
MetaServiceUtils.cpp
MetaHttpHandler.cpp
processors/CreateSpaceProcessor.cpp
processors/AddHostsProcessor.cpp
Expand All @@ -22,12 +22,19 @@ add_library(
processors/GetTagProcessor.cpp
processors/ListTagsProcessor.cpp
processors/RemoveTagProcessor.cpp
processors/GetProcessor.cpp
processors/MultiGetProcessor.cpp
processors/MultiPutProcessor.cpp
processors/RemoveProcessor.cpp
processors/RemoveRangeProcessor.cpp
processors/ScanProcessor.cpp
)

add_dependencies(
meta_service_handler
base_obj
meta_thrift_obj
common_thrift_obj
kvstore_obj
)

Expand All @@ -36,6 +43,11 @@ add_library(
client/MetaClient.cpp
)

add_dependencies(meta_client base_obj meta_thrift_obj)
add_dependencies(
meta_client
base_obj
meta_thrift_obj
common_thrift_obj
)

add_subdirectory(test)
Loading