From 7e66387870d7cc7479715135bc519b9434e1e0a7 Mon Sep 17 00:00:00 2001 From: yaphet Date: Wed, 17 Apr 2019 11:47:53 +0800 Subject: [PATCH] MetaServer support general kv storage (#243) * meta server general KV storage * enhance meta kv storage * address dangleptr's comment --- src/common/base/Status.h | 4 +- src/daemons/GraphDaemon.cpp | 5 +- src/daemons/MetaDaemon.cpp | 8 +- src/graph/test/CMakeLists.txt | 4 +- src/interface/meta.thrift | 84 +++++++++- src/kvstore/Common.h | 1 + src/kvstore/KVEngine.h | 4 + src/kvstore/KVStore.h | 5 + src/kvstore/NebulaStore.cpp | 6 + src/kvstore/NebulaStore.h | 5 + src/kvstore/RocksEngine.cpp | 20 ++- src/kvstore/RocksEngine.h | 3 + src/meta/CMakeLists.txt | 16 +- src/meta/MetaServiceHandler.cpp | 44 ++++++ src/meta/MetaServiceHandler.h | 18 +++ .../{MetaUtils.cpp => MetaServiceUtils.cpp} | 71 ++++----- src/meta/{MetaUtils.h => MetaServiceUtils.h} | 8 +- src/meta/client/MetaClient.cpp | 112 +++++++++++++- src/meta/client/MetaClient.h | 20 +++ src/meta/common/MetaCommon.h | 31 ++++ src/meta/processors/AddHostsProcessor.cpp | 3 +- src/meta/processors/AddTagProcessor.cpp | 10 +- src/meta/processors/BaseProcessor.h | 41 ++++- src/meta/processors/BaseProcessor.inl | 79 +++++++++- src/meta/processors/CreateSpaceProcessor.cpp | 16 +- src/meta/processors/DropSpaceProcessor.cpp | 8 +- .../processors/GetPartsAllocProcessor.cpp | 4 +- src/meta/processors/GetProcessor.cpp | 28 ++++ src/meta/processors/GetProcessor.h | 31 ++++ src/meta/processors/GetTagProcessor.cpp | 9 +- src/meta/processors/ListSpacesProcessor.cpp | 6 +- src/meta/processors/ListTagsProcessor.cpp | 6 +- src/meta/processors/MultiGetProcessor.cpp | 31 ++++ src/meta/processors/MultiGetProcessor.h | 31 ++++ src/meta/processors/MultiPutProcessor.cpp | 23 +++ src/meta/processors/MultiPutProcessor.h | 31 ++++ src/meta/processors/RemoveHostsProcessor.cpp | 4 +- src/meta/processors/RemoveProcessor.cpp | 19 +++ src/meta/processors/RemoveProcessor.h | 31 ++++ src/meta/processors/RemoveRangeProcessor.cpp | 20 +++ src/meta/processors/RemoveRangeProcessor.h | 31 ++++ src/meta/processors/RemoveTagProcessor.cpp | 8 +- src/meta/processors/ScanProcessor.cpp | 28 ++++ src/meta/processors/ScanProcessor.h | 32 ++++ src/meta/test/CMakeLists.txt | 4 +- src/meta/test/MetaClientTest.cpp | 63 +++++++- ...UtilsTest.cpp => MetaServiceUtilsTest.cpp} | 38 ++--- src/meta/test/ProcessorTest.cpp | 145 +++++++++++++++++- src/meta/test/TestUtils.h | 10 +- src/storage/test/CMakeLists.txt | 6 +- src/tools/storage-perf/CMakeLists.txt | 4 +- 51 files changed, 1136 insertions(+), 133 deletions(-) rename src/meta/{MetaUtils.cpp => MetaServiceUtils.cpp} (70%) rename src/meta/{MetaUtils.h => MetaServiceUtils.h} (90%) create mode 100644 src/meta/common/MetaCommon.h create mode 100644 src/meta/processors/GetProcessor.cpp create mode 100644 src/meta/processors/GetProcessor.h create mode 100644 src/meta/processors/MultiGetProcessor.cpp create mode 100644 src/meta/processors/MultiGetProcessor.h create mode 100644 src/meta/processors/MultiPutProcessor.cpp create mode 100644 src/meta/processors/MultiPutProcessor.h create mode 100644 src/meta/processors/RemoveProcessor.cpp create mode 100644 src/meta/processors/RemoveProcessor.h create mode 100644 src/meta/processors/RemoveRangeProcessor.cpp create mode 100644 src/meta/processors/RemoveRangeProcessor.h create mode 100644 src/meta/processors/ScanProcessor.cpp create mode 100644 src/meta/processors/ScanProcessor.h rename src/meta/test/{MetaUtilsTest.cpp => MetaServiceUtilsTest.cpp} (75%) diff --git a/src/common/base/Status.h b/src/common/base/Status.h index 3d815beb20c..e704432e4f8 100644 --- a/src/common/base/Status.h +++ b/src/common/base/Status.h @@ -11,9 +11,9 @@ /** * Status is modeled on the one from levelDB, beyond that, - * this one adds support on move semantics and formated error messages. + * this one adds support on move semantics and formatted error messages. * - * Status is as cheap as raw pointers in the successfull case, + * Status is as cheap as raw pointers in the successful case, * without any heap memory allocations. */ diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index 92d2ad9e80e..0da32def668 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -126,8 +126,11 @@ int main(int argc, char *argv[]) { gServer->setNumAcceptThreads(FLAGS_num_accept_threads); gServer->setListenBacklog(FLAGS_listen_backlog); gServer->setThreadStackSizeMB(5); - if (FLAGS_num_netio_threads != 0) { + if (FLAGS_num_netio_threads > 0) { gServer->setNumIOWorkerThreads(FLAGS_num_netio_threads); + } else { + LOG(WARNING) << "Number netio threads should be greater than zero"; + return EXIT_FAILURE; } // Setup the signal handlers diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 785001501ca..f87860cf492 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -49,6 +49,11 @@ static Status setupSignalHandler(); int main(int argc, char *argv[]) { folly::init(&argc, &argv, true); + if (FLAGS_data_path.empty()) { + LOG(ERROR) << "Meta Data Path should not empty"; + return EXIT_FAILURE; + } + if (FLAGS_daemonize) { google::SetStderrLogging(google::FATAL); } else { @@ -86,8 +91,9 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "Failed to start web service: " << status; return EXIT_FAILURE; } + LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port + << ", dataPath " << FLAGS_data_path; - LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port; auto result = nebula::network::NetworkUtils::getLocalIP(FLAGS_local_ip); CHECK(result.ok()) << result.status(); uint32_t localIP; diff --git a/src/graph/test/CMakeLists.txt b/src/graph/test/CMakeLists.txt index 43b756744df..70c2f333456 100644 --- a/src/graph/test/CMakeLists.txt +++ b/src/graph/test/CMakeLists.txt @@ -3,7 +3,6 @@ add_executable( SessionManagerTest.cpp $ $ - $ $ $ @@ -44,7 +43,6 @@ add_executable( DefineSchemaTest.cpp $ $ - $ $ $ @@ -87,8 +85,8 @@ add_executable( $ $ $ - $ $ + $ $ $ $ diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index f39a35b994f..0713bd9575d 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -22,9 +22,13 @@ enum ErrorCode { // Operation Failure E_NO_HOSTS = -21, - E_SPACE_EXISTED = -22, + E_EXISTED = -22, E_NOT_FOUND = -23, - E_TAG_EXISTED = -24, + + // KV Failure + E_STORE_FAILURE = -31, + E_STORE_SEGMENT_ILLEGAL = -32, + E_KEY_NOT_FOUND = -33, E_UNKNOWN = -99, } (cpp.enum_strict) @@ -40,6 +44,11 @@ struct IdName { 2: string name, } +struct Pair { + 1: string key, + 2: string value, +} + struct TagItem { 1: common.TagID tag_id, 2: string tag_name, @@ -116,7 +125,8 @@ struct GetTagReq { } struct GetTagResp { - 1: common.Schema schema, + 1: ErrorCode code, + 2: common.Schema schema, } // Edge related operations. @@ -183,6 +193,67 @@ struct GetPartsAllocResp { 3: map>(cpp.template = "std::unordered_map") parts, } +struct MultiPutReq { + // segment is used to avoid conflict with system data. + // it should be comprised of numbers and letters. + 1: string segment, + 2: list pairs, +} + +struct MultiPutResp { + 1: ErrorCode code, +} + +struct GetReq { + 1: string segment, + 2: string key, +} + + struct GetResp { + 1: ErrorCode code, + 2: string value, +} + +struct MultiGetReq { + 1: string segment, + 2: list keys, +} + +struct MultiGetResp { + 1: ErrorCode code, + 2: list values, +} + +struct RemoveReq { + 1: string segment, + 2: string key, +} + +struct RemoveResp { + 1: ErrorCode code, +} + +struct RemoveRangeReq { + 1: string segment, + 2: string start, + 3: string end, +} + +struct RemoveRangeResp { + 1: ErrorCode code, +} + +struct ScanReq { + 1: string segment, + 2: string start, + 3: string end, +} + +struct ScanResp { + 1: ErrorCode code, + 2: list values, +} + service MetaService { ExecResp createSpace(1: CreateSpaceReq req); ExecResp dropSpace(1: DropSpaceReq req); @@ -204,5 +275,12 @@ service MetaService { ListHostsResp listHosts(1: ListHostsReq req); GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req); + + MultiPutResp multiPut(1: MultiPutReq req); + GetResp get(1: GetReq req); + MultiGetResp multiGet(1: MultiGetReq req); + RemoveResp remove(1: RemoveReq req); + RemoveRangeResp removeRange(1: RemoveRangeReq req); + ScanResp scan(1: ScanReq req); } diff --git a/src/kvstore/Common.h b/src/kvstore/Common.h index e85fe3ad710..f01cb544319 100644 --- a/src/kvstore/Common.h +++ b/src/kvstore/Common.h @@ -20,6 +20,7 @@ enum ResultCode { ERR_SPACE_NOT_FOUND = -4, ERR_LEADER_CHANAGED = -5, ERR_INVALID_ARGUMENT = -6, + ERR_IO_ERROR = -7, }; #define KV_DATA_PATH_FORMAT(path, spaceId) \ diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 35d9bc8662e..ac813d460e2 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -24,10 +24,14 @@ class KVEngine { virtual ResultCode get(const std::string& key, std::string* value) = 0; + virtual ResultCode multiGet(const std::vector& keys, + std::vector* values) = 0; + virtual ResultCode put(std::string key, std::string value) = 0; virtual ResultCode multiPut(std::vector keyValues) = 0; + /** * Get all results in range [start, end) * */ diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index 6b8a7dea071..ba5fac33125 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -68,6 +68,11 @@ class KVStore { PartitionID partId, const std::string& key, std::string* value) = 0; + + virtual ResultCode multiGet(GraphSpaceID spaceId, + PartitionID partId, + const std::vector& keys, + std::vector* values) = 0; /** * Get all results in range [start, end) * */ diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index cb2f950d40b..ab10cd44696 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -244,6 +244,12 @@ ResultCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId, return engine->get(key, value); } +ResultCode NebulaStore::multiGet(GraphSpaceID spaceId, PartitionID partId, + const std::vector& keys, + std::vector* values) { + CHECK_AND_RETURN_ENGINE(spaceId, partId); + return engine->multiGet(keys, values); +} ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId, const std::string& start, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 1f46319a44a..5b83259c99c 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -53,6 +53,11 @@ class NebulaStore : public KVStore, public Handler { const std::string& key, std::string* value) override; + ResultCode multiGet(GraphSpaceID spaceId, + PartitionID partId, + const std::vector& keys, + std::vector* values) override; + /** * Get all results in range [start, end) * */ diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index c8c1abfdefc..6d84d340f31 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -50,7 +50,6 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId, RocksEngine::~RocksEngine() { } - ResultCode RocksEngine::get(const std::string& key, std::string* value) { rocksdb::ReadOptions options; rocksdb::Status status = db_->Get(options, rocksdb::Slice(key), value); @@ -62,6 +61,25 @@ ResultCode RocksEngine::get(const std::string& key, std::string* value) { return ResultCode::ERR_UNKNOWN; } +ResultCode RocksEngine::multiGet(const std::vector& keys, + std::vector* values) { + rocksdb::ReadOptions options; + std::vector slices; + for (unsigned int index = 0 ; index < keys.size() ; index++) { + slices.emplace_back(keys[index]); + } + + std::vector status = db_->MultiGet(options, slices, values); + auto code = std::all_of(status.begin(), status.end(), + [](rocksdb::Status s) { + return s.ok(); + }); + if (code) { + return ResultCode::SUCCEEDED; + } else { + return ResultCode::ERR_UNKNOWN; + } +} ResultCode RocksEngine::put(std::string key, std::string value) { rocksdb::WriteOptions options; diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 53d85a34960..65619646131 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -100,6 +100,9 @@ class RocksEngine : public KVEngine { ResultCode get(const std::string& key, std::string* value) override; + ResultCode multiGet(const std::vector& keys, + std::vector* values) override; + ResultCode put(std::string key, std::string value) override; diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index d133d2b1dc9..1a6105b1c94 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -11,7 +11,7 @@ add_dependencies(schema_obj base_obj) add_library( meta_service_handler OBJECT MetaServiceHandler.cpp - MetaUtils.cpp + MetaServiceUtils.cpp MetaHttpHandler.cpp processors/AddHostsProcessor.cpp processors/ListHostsProcessor.cpp @@ -24,12 +24,19 @@ add_library( processors/GetTagProcessor.cpp processors/ListTagsProcessor.cpp processors/RemoveTagProcessor.cpp + processors/GetProcessor.cpp + processors/MultiGetProcessor.cpp + processors/MultiPutProcessor.cpp + processors/RemoveProcessor.cpp + processors/RemoveRangeProcessor.cpp + processors/ScanProcessor.cpp ) add_dependencies( meta_service_handler base_obj meta_thrift_obj + common_thrift_obj kvstore_obj ) @@ -38,6 +45,11 @@ add_library( client/MetaClient.cpp ) -add_dependencies(meta_client base_obj meta_thrift_obj) +add_dependencies( + meta_client + base_obj + meta_thrift_obj + common_thrift_obj +) add_subdirectory(test) diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 88759e82c37..3daca11cc98 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -3,6 +3,8 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ + +#include "meta/MetaServiceUtils.h" #include "meta/MetaServiceHandler.h" #include "meta/processors/CreateSpaceProcessor.h" #include "meta/processors/ListSpacesProcessor.h" @@ -16,6 +18,12 @@ #include "meta/processors/GetTagProcessor.h" #include "meta/processors/ListTagsProcessor.h" #include "meta/processors/ListEdgesProcessor.h" +#include "meta/processors/MultiPutProcessor.h" +#include "meta/processors/GetProcessor.h" +#include "meta/processors/MultiGetProcessor.h" +#include "meta/processors/ScanProcessor.h" +#include "meta/processors/RemoveProcessor.h" +#include "meta/processors/RemoveRangeProcessor.h" #include "meta/processors/GetPartsAllocProcessor.h" #define RETURN_FUTURE(processor) \ @@ -68,6 +76,42 @@ MetaServiceHandler::future_getPartsAlloc(const cpp2::GetPartsAllocReq& req) { RETURN_FUTURE(processor); } +folly::Future +MetaServiceHandler::future_multiPut(const cpp2::MultiPutReq& req) { + auto* processor = MultiPutProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +MetaServiceHandler::future_get(const cpp2::GetReq& req) { + auto* processor = GetProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +MetaServiceHandler::future_multiGet(const cpp2::MultiGetReq& req) { + auto* processor = MultiGetProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +MetaServiceHandler::future_scan(const cpp2::ScanReq& req) { + auto* processor = ScanProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +MetaServiceHandler::future_remove(const cpp2::RemoveReq& req) { + auto* processor = RemoveProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + +folly::Future +MetaServiceHandler::future_removeRange(const cpp2::RemoveRangeReq& req) { + auto* processor = RemoveRangeProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + folly::Future MetaServiceHandler::future_addTag(const cpp2::AddTagReq& req) { auto* processor = AddTagProcessor::instance(kvstore_); diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 28aee5a39f1..72ff91ae74d 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -44,6 +44,24 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_getPartsAlloc(const cpp2::GetPartsAllocReq& req) override; + folly::Future + future_multiPut(const cpp2::MultiPutReq& req) override; + + folly::Future + future_get(const cpp2::GetReq& req) override; + + folly::Future + future_multiGet(const cpp2::MultiGetReq& req) override; + + folly::Future + future_remove(const cpp2::RemoveReq& req) override; + + folly::Future + future_removeRange(const cpp2::RemoveRangeReq& req) override; + + folly::Future + future_scan(const cpp2::ScanReq& req) override; + /** * Schema related operations. * */ diff --git a/src/meta/MetaUtils.cpp b/src/meta/MetaServiceUtils.cpp similarity index 70% rename from src/meta/MetaUtils.cpp rename to src/meta/MetaServiceUtils.cpp index c0662fe0c24..4332ea3d3e6 100644 --- a/src/meta/MetaUtils.cpp +++ b/src/meta/MetaServiceUtils.cpp @@ -4,7 +4,7 @@ * (found in the LICENSE.Apache file in the root directory) */ -#include "meta/MetaUtils.h" +#include "meta/MetaServiceUtils.h" #include #include @@ -18,7 +18,7 @@ const std::string kTagsTable = "__tags__"; // NOLINT const std::string kEdgesTable = "__edges__"; // NOLINT const std::string kIndexTable = "__index__"; // NOLINT -std::string MetaUtils::spaceKey(GraphSpaceID spaceId) { +std::string MetaServiceUtils::spaceKey(GraphSpaceID spaceId) { std::string key; key.reserve(256); key.append(kSpacesTable.data(), kSpacesTable.size()); @@ -26,8 +26,9 @@ std::string MetaUtils::spaceKey(GraphSpaceID spaceId) { return key; } - -std::string MetaUtils::spaceVal(int32_t partsNum, int32_t replicaFactor, const std::string& name) { +std::string MetaServiceUtils::spaceVal(int32_t partsNum, + int32_t replicaFactor, + const std::string& name) { std::string val; val.reserve(256); val.append(reinterpret_cast(&partsNum), sizeof(partsNum)); @@ -36,23 +37,19 @@ std::string MetaUtils::spaceVal(int32_t partsNum, int32_t replicaFactor, const s return val; } - -const std::string& MetaUtils::spacePrefix() { +const std::string& MetaServiceUtils::spacePrefix() { return kSpacesTable; } - -GraphSpaceID MetaUtils::spaceId(folly::StringPiece rawKey) { +GraphSpaceID MetaServiceUtils::spaceId(folly::StringPiece rawKey) { return *reinterpret_cast(rawKey.data() + kSpacesTable.size()); } - -folly::StringPiece MetaUtils::spaceName(folly::StringPiece rawVal) { +folly::StringPiece MetaServiceUtils::spaceName(folly::StringPiece rawVal) { return rawVal.subpiece(sizeof(int32_t)*2); } - -std::string MetaUtils::partKey(GraphSpaceID spaceId, PartitionID partId) { +std::string MetaServiceUtils::partKey(GraphSpaceID spaceId, PartitionID partId) { std::string key; key.reserve(128); key.append(kPartsTable.data(), kPartsTable.size()); @@ -61,8 +58,7 @@ std::string MetaUtils::partKey(GraphSpaceID spaceId, PartitionID partId) { return key; } - -std::string MetaUtils::partVal(const std::vector& hosts) { +std::string MetaServiceUtils::partVal(const std::vector& hosts) { std::string val; val.reserve(128); for (auto& h : hosts) { @@ -72,8 +68,7 @@ std::string MetaUtils::partVal(const std::vector& hosts) return val; } - -std::string MetaUtils::partPrefix(GraphSpaceID spaceId) { +std::string MetaServiceUtils::partPrefix(GraphSpaceID spaceId) { std::string prefix; prefix.reserve(128); prefix.append(kPartsTable.data(), kPartsTable.size()); @@ -81,8 +76,7 @@ std::string MetaUtils::partPrefix(GraphSpaceID spaceId) { return prefix; } - -std::vector MetaUtils::parsePartVal(folly::StringPiece val) { +std::vector MetaServiceUtils::parsePartVal(folly::StringPiece val) { std::vector hosts; static const size_t unitSize = sizeof(int32_t) * 2; auto hostsNum = val.size() / unitSize; @@ -99,8 +93,7 @@ std::vector MetaUtils::parsePartVal(folly::StringPiece v return hosts; } - -std::string MetaUtils::hostKey(IPv4 ip, Port port) { +std::string MetaServiceUtils::hostKey(IPv4 ip, Port port) { std::string key; key.reserve(128); key.append(kHostsTable.data(), kHostsTable.size()); @@ -109,24 +102,23 @@ std::string MetaUtils::hostKey(IPv4 ip, Port port) { return key; } -std::string MetaUtils::hostVal() { +std::string MetaServiceUtils::hostVal() { return ""; } - -const std::string& MetaUtils::hostPrefix() { +const std::string& MetaServiceUtils::hostPrefix() { return kHostsTable; } - -nebula::cpp2::HostAddr MetaUtils::parseHostKey(folly::StringPiece key) { +nebula::cpp2::HostAddr MetaServiceUtils::parseHostKey(folly::StringPiece key) { nebula::cpp2::HostAddr host; memcpy(&host, key.data() + kHostsTable.size(), sizeof(host)); return host; } - -std::string MetaUtils::schemaEdgeKey(GraphSpaceID spaceId, EdgeType edgeType, int64_t version) { +std::string MetaServiceUtils::schemaEdgeKey(GraphSpaceID spaceId, + EdgeType edgeType, + int64_t version) { std::string key; key.reserve(128); key.append(kEdgesTable.data(), kEdgesTable.size()); @@ -136,14 +128,13 @@ std::string MetaUtils::schemaEdgeKey(GraphSpaceID spaceId, EdgeType edgeType, in return key; } - -std::string MetaUtils::schemaEdgeVal(nebula::cpp2::Schema schema) { +std::string MetaServiceUtils::schemaEdgeVal(nebula::cpp2::Schema schema) { std::string val; apache::thrift::CompactSerializer::serialize(schema, &val); return val; } -std::string MetaUtils::schemaTagKey(GraphSpaceID spaceId, TagID tagId, int64_t version) { +std::string MetaServiceUtils::schemaTagKey(GraphSpaceID spaceId, TagID tagId, int64_t version) { std::string key; key.reserve(128); key.append(kTagsTable.data(), kTagsTable.size()); @@ -153,8 +144,7 @@ std::string MetaUtils::schemaTagKey(GraphSpaceID spaceId, TagID tagId, int64_t v return key; } - -std::string MetaUtils::schemaTagsPrefix(GraphSpaceID spaceId) { +std::string MetaServiceUtils::schemaTagsPrefix(GraphSpaceID spaceId) { std::string key; key.reserve(kTagsTable.size() + sizeof(GraphSpaceID)); key.append(kTagsTable.data(), kTagsTable.size()); @@ -163,7 +153,7 @@ std::string MetaUtils::schemaTagsPrefix(GraphSpaceID spaceId) { } -std::string MetaUtils::schemaTagVal(const std::string& name, nebula::cpp2::Schema schema) { +std::string MetaServiceUtils::schemaTagVal(const std::string& name, nebula::cpp2::Schema schema) { int32_t len = name.size(); std::string val, sval; apache::thrift::CompactSerializer::serialize(schema, &sval); @@ -174,8 +164,7 @@ std::string MetaUtils::schemaTagVal(const std::string& name, nebula::cpp2::Schem return val; } - -nebula::cpp2::Schema MetaUtils::parseSchema(folly::StringPiece rawData) { +nebula::cpp2::Schema MetaServiceUtils::parseSchema(folly::StringPiece rawData) { nebula::cpp2::Schema schema; int32_t offset = sizeof(int32_t) + *reinterpret_cast(rawData.begin()); auto schval = rawData.subpiece(offset, rawData.size() - offset); @@ -183,8 +172,7 @@ nebula::cpp2::Schema MetaUtils::parseSchema(folly::StringPiece rawData) { return schema; } - -std::string MetaUtils::indexKey(EntryType type, const std::string& name) { +std::string MetaServiceUtils::indexKey(EntryType type, const std::string& name) { std::string key; key.reserve(128); key.append(kIndexTable.data(), kIndexTable.size()); @@ -193,5 +181,14 @@ std::string MetaUtils::indexKey(EntryType type, const std::string& name) { return key; } +std::string MetaServiceUtils::assembleSegmentKey(const std::string& segment, + const std::string& key) { + std::string segmentKey; + segmentKey.reserve(64); + segmentKey.append(segment); + segmentKey.append(key.data(), key.size()); + return segmentKey; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/MetaUtils.h b/src/meta/MetaServiceUtils.h similarity index 90% rename from src/meta/MetaUtils.h rename to src/meta/MetaServiceUtils.h index 71fe1f50d51..cd08fb67bf9 100644 --- a/src/meta/MetaUtils.h +++ b/src/meta/MetaServiceUtils.h @@ -19,9 +19,9 @@ enum class EntryType : int8_t { EDGE = 0x03, }; -class MetaUtils final { +class MetaServiceUtils final { public: - MetaUtils() = delete; + MetaServiceUtils() = delete; static std::string spaceKey(GraphSpaceID spaceId); @@ -38,6 +38,8 @@ class MetaUtils final { static std::string partVal(const std::vector& hosts); + static const std::string& partPrefix(); + static std::string partPrefix(GraphSpaceID spaceId); static std::vector parsePartVal(folly::StringPiece val); @@ -63,6 +65,8 @@ class MetaUtils final { static nebula::cpp2::Schema parseSchema(folly::StringPiece rawData); static std::string indexKey(EntryType type, const std::string& name); + + static std::string assembleSegmentKey(const std::string& segment, const std::string& key); }; } // namespace meta diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index 9e12fd15f0f..57d88c98b42 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -4,6 +4,7 @@ * (found in the LICENSE.Apache file in the root directory) */ +#include "meta/common/MetaCommon.h" #include "meta/client/MetaClient.h" #include "network/NetworkUtils.h" @@ -12,6 +13,19 @@ DEFINE_string(meta_server_addrs, "", "list of meta server addresses," "the format looks like ip1:port1, ip2:port2, ip3:port3"); DEFINE_int32(meta_client_io_threads, 3, "meta client io threads"); +/** + * check argument is empty + */ +#define CHECK_PARAMETER_AND_RETURN_STATUS(argument) \ + if (argument.empty()) { \ + return Status::Error("argument is invalid!"); \ + } + +#define CHECK_SEGMENT_AND_RETURN_STATUS(segment) \ + if (!nebula::meta::MetaCommon::checkSegment(segment)) { \ + return Status::Error("segment is invalid!"); \ + } + namespace nebula { namespace meta { @@ -227,6 +241,100 @@ MetaClient::getSpaceIdByNameFromCache(const std::string& name) { return Status::SpaceNotFound(); } +folly::Future> +MetaClient::multiPut(std::string segment, + std::vector> pairs) { + CHECK_SEGMENT_AND_RETURN_STATUS(segment); + CHECK_PARAMETER_AND_RETURN_STATUS(pairs); + cpp2::MultiPutReq req; + std::vector data; + for (auto& element : pairs) { + data.emplace_back(apache::thrift::FragileConstructor::FRAGILE, + std::move(element.first), std::move(element.second)); + } + req.set_segment(std::move(segment)); + req.set_pairs(std::move(data)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_multiPut(request); + }, [] (cpp2::MultiPutResp&& resp) -> bool { + return resp.code == cpp2::ErrorCode::SUCCEEDED; + }); +} + +folly::Future> +MetaClient::get(std::string segment, std::string key) { + CHECK_SEGMENT_AND_RETURN_STATUS(segment); + CHECK_PARAMETER_AND_RETURN_STATUS(key); + cpp2::GetReq req; + req.set_segment(std::move(segment)); + req.set_key(std::move(key)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_get(request); + }, [] (cpp2::GetResp&& resp) -> std::string { + return resp.get_value(); + }); +} + +folly::Future>> +MetaClient::multiGet(std::string segment, std::vector keys) { + CHECK_SEGMENT_AND_RETURN_STATUS(segment); + CHECK_PARAMETER_AND_RETURN_STATUS(keys); + cpp2::MultiGetReq req; + req.set_segment(std::move(segment)); + req.set_keys(std::move(keys)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_multiGet(request); + }, [] (cpp2::MultiGetResp&& resp) -> std::vector { + return resp.get_values(); + }); +} + +folly::Future>> +MetaClient::scan(std::string segment, std::string start, std::string end) { + CHECK_SEGMENT_AND_RETURN_STATUS(segment); + CHECK_PARAMETER_AND_RETURN_STATUS(start); + CHECK_PARAMETER_AND_RETURN_STATUS(end); + cpp2::ScanReq req; + req.set_segment(std::move(segment)); + req.set_start(std::move(start)); + req.set_end(std::move(end)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_scan(request); + }, [] (cpp2::ScanResp&& resp) -> std::vector { + return resp.get_values(); + }); +} + +folly::Future> +MetaClient::remove(std::string segment, std::string key) { + CHECK_SEGMENT_AND_RETURN_STATUS(segment); + CHECK_PARAMETER_AND_RETURN_STATUS(key); + cpp2::RemoveReq req; + req.set_segment(std::move(segment)); + req.set_key(std::move(key)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_remove(request); + }, [] (cpp2::RemoveResp&& resp) -> bool { + return resp.code == cpp2::ErrorCode::SUCCEEDED; + }); +} + +folly::Future> +MetaClient::removeRange(std::string segment, std::string start, std::string end) { + CHECK_SEGMENT_AND_RETURN_STATUS(segment); + CHECK_PARAMETER_AND_RETURN_STATUS(start); + CHECK_PARAMETER_AND_RETURN_STATUS(end); + cpp2::RemoveRangeReq req; + req.set_segment(std::move(segment)); + req.set_start(std::move(start)); + req.set_end(std::move(end)); + return getResponse(std::move(req), [] (auto client, auto request) { + return client->future_removeRange(request); + }, [] (cpp2::RemoveRangeResp&& resp) -> bool { + return resp.code == cpp2::ErrorCode::SUCCEEDED; + }); +} + std::vector MetaClient::to(const std::vector& tHosts) { std::vector hosts; hosts.resize(tHosts.size()); @@ -250,8 +358,8 @@ Status MetaClient::handleResponse(const RESP& resp) { switch (resp.get_code()) { case cpp2::ErrorCode::SUCCEEDED: return Status::OK(); - case cpp2::ErrorCode::E_SPACE_EXISTED: - return Status::Error("space existed!"); + case cpp2::ErrorCode::E_EXISTED: + return Status::Error("existed!"); case cpp2::ErrorCode::E_NOT_FOUND: return Status::Error("not existed!"); case cpp2::ErrorCode::E_LEADER_CHANGED: diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 00bca327bdf..98cd644b002 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -100,6 +100,26 @@ class MetaClient { int32_t partsNum(GraphSpaceID spaceId); + folly::Future> + multiPut(std::string segment, + std::vector> pairs); + + folly::Future> + get(std::string segment, std::string key); + + folly::Future>> + multiGet(std::string segment, std::vector keys); + + folly::Future>> + scan(std::string segment, std::string start, std::string end); + + folly::Future> + remove(std::string segment, std::string key); + + folly::Future> + removeRange(std::string segment, std::string start, std::string end); + + protected: void loadDataThreadFunc(); diff --git a/src/meta/common/MetaCommon.h b/src/meta/common/MetaCommon.h new file mode 100644 index 00000000000..06e02c31816 --- /dev/null +++ b/src/meta/common/MetaCommon.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_METACOMMON_H_ +#define META_METACOMMON_H_ + +#include "base/Base.h" + +namespace nebula { +namespace meta { + +class MetaCommon final { +public: + MetaCommon() = delete; + + static bool checkSegment(const std::string& segment) { + static const std::regex pattern("^[0-9a-zA-Z]+$"); + if (!segment.empty() && std::regex_match(segment, pattern)) { + return true; + } + return false; + } +}; + +} // namespace meta +} // namespace nebula + +#endif // META_METACOMMON_H_ diff --git a/src/meta/processors/AddHostsProcessor.cpp b/src/meta/processors/AddHostsProcessor.cpp index 8e6b502a8d2..0a1bef274cf 100644 --- a/src/meta/processors/AddHostsProcessor.cpp +++ b/src/meta/processors/AddHostsProcessor.cpp @@ -9,12 +9,11 @@ namespace nebula { namespace meta { - void AddHostsProcessor::process(const cpp2::AddHostsReq& req) { folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); std::vector data; for (auto& h : req.get_hosts()) { - data.emplace_back(MetaUtils::hostKey(h.ip, h.port), MetaUtils::hostVal()); + data.emplace_back(MetaServiceUtils::hostKey(h.ip, h.port), MetaServiceUtils::hostVal()); } doPut(std::move(data)); } diff --git a/src/meta/processors/AddTagProcessor.cpp b/src/meta/processors/AddTagProcessor.cpp index 91664cf4be3..a26a3b39a75 100644 --- a/src/meta/processors/AddTagProcessor.cpp +++ b/src/meta/processors/AddTagProcessor.cpp @@ -21,24 +21,24 @@ void AddTagProcessor::process(const cpp2::AddTagReq& req) { std::vector data; if (ret.ok()) { resp_.set_id(to(ret.value(), EntryType::TAG)); - resp_.set_code(cpp2::ErrorCode::E_TAG_EXISTED); + resp_.set_code(cpp2::ErrorCode::E_EXISTED); onFinished(); return; } auto version = time::TimeUtils::nowInMSeconds(); TagID tagId = autoIncrementId(); - data.emplace_back(MetaUtils::indexKey(EntryType::TAG, req.get_tag_name()), + data.emplace_back(MetaServiceUtils::indexKey(EntryType::TAG, req.get_tag_name()), std::string(reinterpret_cast(&tagId), sizeof(tagId))); LOG(INFO) << "Add Tag " << req.get_tag_name() << ", tagId " << tagId; - data.emplace_back(MetaUtils::schemaTagKey(req.get_space_id(), tagId, version), - MetaUtils::schemaTagVal(req.get_tag_name(), req.get_schema())); + data.emplace_back(MetaServiceUtils::schemaTagKey(req.get_space_id(), tagId, version), + MetaServiceUtils::schemaTagVal(req.get_tag_name(), req.get_schema())); resp_.set_code(cpp2::ErrorCode::SUCCEEDED); resp_.set_id(to(tagId, EntryType::TAG)); doPut(std::move(data)); } StatusOr AddTagProcessor::getTag(const std::string& tagName) { - auto indexKey = MetaUtils::indexKey(EntryType::TAG, tagName); + auto indexKey = MetaServiceUtils::indexKey(EntryType::TAG, tagName); std::string val; auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val); if (ret == kvstore::ResultCode::SUCCEEDED) { diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index eedb92e31f5..bb8dcbd19e6 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -15,7 +15,8 @@ #include "base/StatusOr.h" #include "time/Duration.h" #include "kvstore/KVStore.h" -#include "meta/MetaUtils.h" +#include "meta/MetaServiceUtils.h" +#include "meta/common/MetaCommon.h" #include "network/NetworkUtils.h" namespace nebula { @@ -39,6 +40,16 @@ GENERATE_LOCK(tag); #undef GENERATE_LOCK }; +/** + * Check segemnt is consist of numbers and letters and should not empty. + * */ +#define CHECK_SEGMENT(segment) \ + if (!MetaCommon::checkSegment(segment)) { \ + resp_.set_code(cpp2::ErrorCode::E_STORE_SEGMENT_ILLEGAL); \ + onFinished(); \ + return; \ + } + #define MAX_VERSION_HEX 0x7FFFFFFFFFFFFFFF #define MIN_VERSION_HEX 0x0000000000000000 @@ -110,9 +121,35 @@ class BaseProcessor { void doPut(std::vector data); /** + * General get function. + * */ + StatusOr doGet(const std::string& key); + + /** + * General multi get function. + * */ + StatusOr> doMultiGet(const std::vector& keys); + + /** + * General remove function. + * */ + void doRemove(const std::string& key); + + /** + * Remove keys from start to end, doesn't contain end. + * */ + void doRemoveRange(const std::string& start, + const std::string& end); + + /** + * Scan keys from start to end, doesn't contain end. + * */ + StatusOr> doScan(const std::string& start, + const std::string& end); + /** * General multi remove function. **/ - void doRemove(std::vector keys); + void doMultiRemove(std::vector keys); /** * Get all hosts diff --git a/src/meta/processors/BaseProcessor.inl b/src/meta/processors/BaseProcessor.inl index 5bf4b9b7954..6b1866687a2 100644 --- a/src/meta/processors/BaseProcessor.inl +++ b/src/meta/processors/BaseProcessor.inl @@ -4,7 +4,7 @@ * (found in the LICENSE.Apache file in the root directory) */ - +#include "meta/MetaServiceUtils.h" #include "meta/processors/BaseProcessor.h" namespace nebula { @@ -21,7 +21,44 @@ void BaseProcessor::doPut(std::vector data) { } template -void BaseProcessor::doRemove(std::vector keys) { +StatusOr BaseProcessor::doGet(const std::string& key) { + std::string value; + auto code = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, + key, &value); + switch (code) { + case kvstore::ResultCode::SUCCEEDED: + return value; + case kvstore::ResultCode::ERR_KEY_NOT_FOUND: + return Status::Error("Key Not Found"); + default: + return Status::Error("Get Failed"); + } +} + +template +StatusOr> +BaseProcessor::doMultiGet(const std::vector& keys) { + std::vector values; + auto code = kvstore_->multiGet(kDefaultSpaceId_, kDefaultPartId_, + keys, &values); + if (code != kvstore::ResultCode::SUCCEEDED) { + return Status::Error("MultiGet Failed"); + } + return values; +} + +template +void BaseProcessor::doRemove(const std::string& key) { + kvstore_->asyncRemove(kDefaultSpaceId_, kDefaultPartId_, key, + [this] (kvstore::ResultCode code, HostAddr leader) { + UNUSED(leader); + this->resp_.set_code(to(code)); + this->onFinished(); + }); +} + +template +void BaseProcessor::doMultiRemove(std::vector keys) { kvstore_->asyncMultiRemove(kDefaultSpaceId_, kDefaultPartId_, std::move(keys), [this] (kvstore::ResultCode code, HostAddr leader) { UNUSED(leader); @@ -30,10 +67,39 @@ void BaseProcessor::doRemove(std::vector keys) { }); } +template +void BaseProcessor::doRemoveRange(const std::string& start, + const std::string& end) { + kvstore_->asyncRemoveRange(kDefaultSpaceId_, kDefaultPartId_, start, end, + [this] (kvstore::ResultCode code, HostAddr leader) { + UNUSED(leader); + this->resp_.set_code(to(code)); + this->onFinished(); + }); +} + +template +StatusOr> BaseProcessor::doScan(const std::string& start, + const std::string& end) { + std::unique_ptr iter; + auto code = kvstore_->range(kDefaultSpaceId_, kDefaultPartId_, + start, end, &iter); + if (code != kvstore::ResultCode::SUCCEEDED) { + return Status::Error("Scan Failed"); + } + + std::vector values; + while (iter->valid()) { + values.emplace_back(iter->val()); + iter->next(); + } + return values; +} + template StatusOr> BaseProcessor::allHosts() { std::vector hosts; - const auto& prefix = MetaUtils::hostPrefix(); + const auto& prefix = MetaServiceUtils::hostPrefix(); std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { @@ -76,7 +142,7 @@ int32_t BaseProcessor::autoIncrementId() { template Status BaseProcessor::spaceExist(GraphSpaceID spaceId) { folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - auto spaceKey = MetaUtils::spaceKey(spaceId); + auto spaceKey = MetaServiceUtils::spaceKey(spaceId); std::string val; auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, spaceKey, &val); if (ret == kvstore::ResultCode::SUCCEEDED) { @@ -92,7 +158,7 @@ Status BaseProcessor::hostsExist(const std::vector &hostsKey) auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, hostKey , &val); if (ret != kvstore::ResultCode::SUCCEEDED) { if (ret == kvstore::ResultCode::ERR_KEY_NOT_FOUND) { - nebula::cpp2::HostAddr host = MetaUtils::parseHostKey(hostKey); + nebula::cpp2::HostAddr host = MetaServiceUtils::parseHostKey(hostKey); std::string ip = NetworkUtils::intToIPv4(host.get_ip()); int32_t port = host.get_port(); VLOG(3) << "Error, host IP " << ip << " port " << port @@ -109,7 +175,7 @@ Status BaseProcessor::hostsExist(const std::vector &hostsKey) template StatusOr BaseProcessor::getSpaceId(const std::string& name) { - auto indexKey = MetaUtils::indexKey(EntryType::SPACE, name); + auto indexKey = MetaServiceUtils::indexKey(EntryType::SPACE, name); std::string val; auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, indexKey, &val); if (ret == kvstore::ResultCode::SUCCEEDED) { @@ -120,4 +186,3 @@ StatusOr BaseProcessor::getSpaceId(const std::string& name) } // namespace meta } // namespace nebula - diff --git a/src/meta/processors/CreateSpaceProcessor.cpp b/src/meta/processors/CreateSpaceProcessor.cpp index 7e4b4d97990..1af42b2da54 100644 --- a/src/meta/processors/CreateSpaceProcessor.cpp +++ b/src/meta/processors/CreateSpaceProcessor.cpp @@ -14,7 +14,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { auto spaceRet = getSpaceId(req.get_space_name()); if (spaceRet.ok()) { resp_.set_id(to(spaceRet.value(), EntryType::SPACE)); - resp_.set_code(cpp2::ErrorCode::E_SPACE_EXISTED); + resp_.set_code(cpp2::ErrorCode::E_EXISTED); onFinished(); return; } @@ -30,16 +30,16 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { auto hosts = ret.value(); auto replicaFactor = req.get_replica_factor(); std::vector data; - data.emplace_back(MetaUtils::indexKey(EntryType::SPACE, req.get_space_name()), + data.emplace_back(MetaServiceUtils::indexKey(EntryType::SPACE, req.get_space_name()), std::string(reinterpret_cast(&spaceId), sizeof(spaceId))); - data.emplace_back(MetaUtils::spaceKey(spaceId), - MetaUtils::spaceVal(req.get_parts_num(), - replicaFactor, - req.get_space_name())); + data.emplace_back(MetaServiceUtils::spaceKey(spaceId), + MetaServiceUtils::spaceVal(req.get_parts_num(), + replicaFactor, + req.get_space_name())); for (auto partId = 1; partId <= req.get_parts_num(); partId++) { auto partHosts = pickHosts(partId, hosts, replicaFactor); - data.emplace_back(MetaUtils::partKey(spaceId, partId), - MetaUtils::partVal(partHosts)); + data.emplace_back(MetaServiceUtils::partKey(spaceId, partId), + MetaServiceUtils::partVal(partHosts)); } resp_.set_code(cpp2::ErrorCode::SUCCEEDED); resp_.set_id(to(spaceId, EntryType::SPACE)); diff --git a/src/meta/processors/DropSpaceProcessor.cpp b/src/meta/processors/DropSpaceProcessor.cpp index 72e1fa01d6e..6b46d03e374 100644 --- a/src/meta/processors/DropSpaceProcessor.cpp +++ b/src/meta/processors/DropSpaceProcessor.cpp @@ -24,7 +24,7 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { resp_.set_code(cpp2::ErrorCode::SUCCEEDED); std::vector deleteKeys; - auto prefix = MetaUtils::partPrefix(spaceId); + auto prefix = MetaServiceUtils::partPrefix(spaceId); std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { @@ -38,11 +38,11 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) { iter->next(); } - deleteKeys.emplace_back(MetaUtils::indexKey(EntryType::SPACE, req.get_space_name())); - deleteKeys.emplace_back(MetaUtils::spaceKey(spaceId)); + deleteKeys.emplace_back(MetaServiceUtils::indexKey(EntryType::SPACE, req.get_space_name())); + deleteKeys.emplace_back(MetaServiceUtils::spaceKey(spaceId)); // TODO(YT) delete Tag/Edge under the space - doRemove(std::move(deleteKeys)); + doMultiRemove(std::move(deleteKeys)); // TODO(YT) delete part files of the space } diff --git a/src/meta/processors/GetPartsAllocProcessor.cpp b/src/meta/processors/GetPartsAllocProcessor.cpp index 8c6f212d0ab..b0ac0145623 100644 --- a/src/meta/processors/GetPartsAllocProcessor.cpp +++ b/src/meta/processors/GetPartsAllocProcessor.cpp @@ -12,7 +12,7 @@ namespace meta { void GetPartsAllocProcessor::process(const cpp2::GetPartsAllocReq& req) { folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); auto spaceId = req.get_space_id(); - auto prefix = MetaUtils::partPrefix(spaceId); + auto prefix = MetaServiceUtils::partPrefix(spaceId); std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); resp_.set_code(to(ret)); @@ -25,7 +25,7 @@ void GetPartsAllocProcessor::process(const cpp2::GetPartsAllocReq& req) { auto key = iter->key(); PartitionID partId; memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); - std::vector partHosts = MetaUtils::parsePartVal(iter->val()); + std::vector partHosts = MetaServiceUtils::parsePartVal(iter->val()); parts.emplace(partId, std::move(partHosts)); iter->next(); } diff --git a/src/meta/processors/GetProcessor.cpp b/src/meta/processors/GetProcessor.cpp new file mode 100644 index 00000000000..f552001518a --- /dev/null +++ b/src/meta/processors/GetProcessor.cpp @@ -0,0 +1,28 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + + +#include "meta/processors/GetProcessor.h" + +namespace nebula { +namespace meta { + +void GetProcessor::process(const cpp2::GetReq& req) { + CHECK_SEGMENT(req.get_segment()); + auto key = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_key()); + auto result = doGet(key); + if (!result.ok()) { + resp_.set_code(cpp2::ErrorCode::E_KEY_NOT_FOUND); + onFinished(); + return; + } + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + resp_.set_value(std::move(result.value())); + onFinished(); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/GetProcessor.h b/src/meta/processors/GetProcessor.h new file mode 100644 index 00000000000..badf0c2001b --- /dev/null +++ b/src/meta/processors/GetProcessor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_GETPROCESSOR_H_ +#define META_GETPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class GetProcessor : public BaseProcessor { +public: + static GetProcessor* instance(kvstore::KVStore* kvstore) { + return new GetProcessor(kvstore); + } + + void process(const cpp2::GetReq& req); + +private: + explicit GetProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_GETPROCESSOR_H_ diff --git a/src/meta/processors/GetTagProcessor.cpp b/src/meta/processors/GetTagProcessor.cpp index 241a601992c..b1c19ba0c95 100644 --- a/src/meta/processors/GetTagProcessor.cpp +++ b/src/meta/processors/GetTagProcessor.cpp @@ -12,15 +12,16 @@ namespace meta { void GetTagProcessor::process(const cpp2::GetTagReq& req) { folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); std::string val; - std::string tagKey = MetaUtils::schemaTagKey(req.get_space_id(), - req.get_tag_id(), - req.get_version()); + std::string tagKey = MetaServiceUtils::schemaTagKey(req.get_space_id(), + req.get_tag_id(), + req.get_version()); auto ret = kvstore_->get(kDefaultSpaceId_, kDefaultPartId_, std::move(tagKey), &val); if (ret != kvstore::ResultCode::SUCCEEDED) { + resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); onFinished(); return; } - resp_.set_schema(MetaUtils::parseSchema(val)); + resp_.set_schema(MetaServiceUtils::parseSchema(val)); onFinished(); } } // namespace meta diff --git a/src/meta/processors/ListSpacesProcessor.cpp b/src/meta/processors/ListSpacesProcessor.cpp index 75fcaa6d338..b031e0867eb 100644 --- a/src/meta/processors/ListSpacesProcessor.cpp +++ b/src/meta/processors/ListSpacesProcessor.cpp @@ -12,7 +12,7 @@ namespace meta { void ListSpacesProcessor::process(const cpp2::ListSpacesReq& req) { UNUSED(req); folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - auto prefix = MetaUtils::spacePrefix(); + auto prefix = MetaServiceUtils::spacePrefix(); std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { @@ -22,8 +22,8 @@ void ListSpacesProcessor::process(const cpp2::ListSpacesReq& req) { } std::vector spaces; while (iter->valid()) { - auto spaceId = MetaUtils::spaceId(iter->key()); - auto spaceName = MetaUtils::spaceName(iter->val()); + auto spaceId = MetaServiceUtils::spaceId(iter->key()); + auto spaceName = MetaServiceUtils::spaceName(iter->val()); VLOG(3) << "List spaces " << spaceId << ", name " << spaceName.str(); spaces.emplace_back(apache::thrift::FragileConstructor::FRAGILE, to(spaceId, EntryType::SPACE), diff --git a/src/meta/processors/ListTagsProcessor.cpp b/src/meta/processors/ListTagsProcessor.cpp index 80ee6034570..8ed34e25a3d 100644 --- a/src/meta/processors/ListTagsProcessor.cpp +++ b/src/meta/processors/ListTagsProcessor.cpp @@ -12,7 +12,7 @@ namespace meta { void ListTagsProcessor::process(const cpp2::ListTagsReq& req) { folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); auto spaceId = req.get_space_id(); - auto prefix = MetaUtils::schemaTagsPrefix(spaceId); + auto prefix = MetaServiceUtils::schemaTagsPrefix(spaceId); std::unique_ptr iter; auto ret = kvstore_->prefix(kDefaultSpaceId_, kDefaultPartId_, prefix, &iter); resp_.set_code(to(ret)); @@ -29,9 +29,9 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq& req) { auto vers = *reinterpret_cast(key.data() + prefix.size() + sizeof(TagID)); auto nameLen = *reinterpret_cast(val.data()); auto tagName = val.subpiece(sizeof(int32_t), nameLen).str(); - auto schema = MetaUtils::parseSchema(val); + auto schema = MetaServiceUtils::parseSchema(val); cpp2::TagItem tagItem(apache::thrift::FragileConstructor::FRAGILE, - tagID, tagName, vers, schema); + tagID, tagName, vers, schema); tags.emplace_back(std::move(tagItem)); iter->next(); } diff --git a/src/meta/processors/MultiGetProcessor.cpp b/src/meta/processors/MultiGetProcessor.cpp new file mode 100644 index 00000000000..ef1659e1ed9 --- /dev/null +++ b/src/meta/processors/MultiGetProcessor.cpp @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "meta/processors/MultiGetProcessor.h" + +namespace nebula { +namespace meta { + +void MultiGetProcessor::process(const cpp2::MultiGetReq& req) { + CHECK_SEGMENT(req.get_segment()); + std::vector keys; + for (auto& key : req.get_keys()) { + keys.emplace_back(MetaServiceUtils::assembleSegmentKey(req.get_segment(), key)); + } + + auto result = doMultiGet(std::move(keys)); + if (!result.ok()) { + resp_.set_code(cpp2::ErrorCode::E_STORE_FAILURE); + onFinished(); + return; + } + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + resp_.set_values(std::move(result.value())); + onFinished(); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/MultiGetProcessor.h b/src/meta/processors/MultiGetProcessor.h new file mode 100644 index 00000000000..b6f3f956e2b --- /dev/null +++ b/src/meta/processors/MultiGetProcessor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_MULTIGETPROCESSOR_H_ +#define META_MULTIGETPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class MultiGetProcessor : public BaseProcessor { +public: + static MultiGetProcessor* instance(kvstore::KVStore* kvstore) { + return new MultiGetProcessor(kvstore); + } + + void process(const cpp2::MultiGetReq& req); + +private: + explicit MultiGetProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_MULTIGETPROCESSOR_H_ diff --git a/src/meta/processors/MultiPutProcessor.cpp b/src/meta/processors/MultiPutProcessor.cpp new file mode 100644 index 00000000000..7e15d546eaa --- /dev/null +++ b/src/meta/processors/MultiPutProcessor.cpp @@ -0,0 +1,23 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "meta/processors/MultiPutProcessor.h" + +namespace nebula { +namespace meta { + +void MultiPutProcessor::process(const cpp2::MultiPutReq& req) { + CHECK_SEGMENT(req.get_segment()); + std::vector data; + for (auto& pair : req.get_pairs()) { + data.emplace_back(MetaServiceUtils::assembleSegmentKey(req.get_segment(), pair.get_key()), + pair.get_value()); + } + doPut(std::move(data)); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/MultiPutProcessor.h b/src/meta/processors/MultiPutProcessor.h new file mode 100644 index 00000000000..8ccc358ebb9 --- /dev/null +++ b/src/meta/processors/MultiPutProcessor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_MULTIPUTPROCESSOR_H_ +#define META_MULTIPUTPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class MultiPutProcessor : public BaseProcessor { +public: + static MultiPutProcessor* instance(kvstore::KVStore* kvstore) { + return new MultiPutProcessor(kvstore); + } + + void process(const cpp2::MultiPutReq& req); + +private: + explicit MultiPutProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_MULTIPUTPROCESSOR_H_ diff --git a/src/meta/processors/RemoveHostsProcessor.cpp b/src/meta/processors/RemoveHostsProcessor.cpp index ab8b1594db3..589dbc27886 100644 --- a/src/meta/processors/RemoveHostsProcessor.cpp +++ b/src/meta/processors/RemoveHostsProcessor.cpp @@ -15,7 +15,7 @@ void RemoveHostsProcessor::process(const cpp2::RemoveHostsReq& req) { std::vector hostsKey; for (auto& h : req.get_hosts()) { - hostsKey.emplace_back(MetaUtils::hostKey(h.ip, h.port)); + hostsKey.emplace_back(MetaServiceUtils::hostKey(h.ip, h.port)); } auto hostsRet = hostsExist(hostsKey); @@ -27,7 +27,7 @@ void RemoveHostsProcessor::process(const cpp2::RemoveHostsReq& req) { LOG(INFO) << "Remove hosts "; resp_.set_code(cpp2::ErrorCode::SUCCEEDED); - doRemove(std::move(hostsKey)); + doMultiRemove(std::move(hostsKey)); } } // namespace meta diff --git a/src/meta/processors/RemoveProcessor.cpp b/src/meta/processors/RemoveProcessor.cpp new file mode 100644 index 00000000000..efdb5f358ed --- /dev/null +++ b/src/meta/processors/RemoveProcessor.cpp @@ -0,0 +1,19 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "meta/processors/RemoveProcessor.h" + +namespace nebula { +namespace meta { + +void RemoveProcessor::process(const cpp2::RemoveReq& req) { + CHECK_SEGMENT(req.get_segment()); + auto key = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_key()); + doRemove(key); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/RemoveProcessor.h b/src/meta/processors/RemoveProcessor.h new file mode 100644 index 00000000000..c63ca5c0e66 --- /dev/null +++ b/src/meta/processors/RemoveProcessor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_REMOVEPROCESSOR_H_ +#define META_REMOVEPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class RemoveProcessor : public BaseProcessor { +public: + static RemoveProcessor* instance(kvstore::KVStore* kvstore) { + return new RemoveProcessor(kvstore); + } + + void process(const cpp2::RemoveReq& req); + +private: + explicit RemoveProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_DELETEPROCESSOR_H_ diff --git a/src/meta/processors/RemoveRangeProcessor.cpp b/src/meta/processors/RemoveRangeProcessor.cpp new file mode 100644 index 00000000000..ea5d3a66ad3 --- /dev/null +++ b/src/meta/processors/RemoveRangeProcessor.cpp @@ -0,0 +1,20 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "meta/processors/RemoveRangeProcessor.h" + +namespace nebula { +namespace meta { + +void RemoveRangeProcessor::process(const cpp2::RemoveRangeReq& req) { + CHECK_SEGMENT(req.get_segment()); + auto start = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_start()); + auto end = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_end()); + doRemoveRange(start, end); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/RemoveRangeProcessor.h b/src/meta/processors/RemoveRangeProcessor.h new file mode 100644 index 00000000000..59a584ad91a --- /dev/null +++ b/src/meta/processors/RemoveRangeProcessor.h @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_REMOVERANGEPROCESSOR_H_ +#define META_REMOVERANGEPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class RemoveRangeProcessor : public BaseProcessor { +public: + static RemoveRangeProcessor* instance(kvstore::KVStore* kvstore) { + return new RemoveRangeProcessor(kvstore); + } + + void process(const cpp2::RemoveRangeReq& req); + +private: + explicit RemoveRangeProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_REMOVERANGEPROCESSOR_H_ diff --git a/src/meta/processors/RemoveTagProcessor.cpp b/src/meta/processors/RemoveTagProcessor.cpp index dfe2b198ac0..35a94f2140f 100644 --- a/src/meta/processors/RemoveTagProcessor.cpp +++ b/src/meta/processors/RemoveTagProcessor.cpp @@ -24,12 +24,12 @@ void RemoveTagProcessor::process(const cpp2::RemoveTagReq& req) { } resp_.set_code(cpp2::ErrorCode::SUCCEEDED); LOG(INFO) << "Remove Tag " << req.get_tag_name(); - doRemove(std::move(ret.value())); + doMultiRemove(std::move(ret.value())); } StatusOr> RemoveTagProcessor::getTagKeys(GraphSpaceID id, const std::string& tagName) { - auto indexKey = MetaUtils::indexKey(EntryType::TAG, tagName); + auto indexKey = MetaServiceUtils::indexKey(EntryType::TAG, tagName); std::vector keys; std::string tagVal; TagID tagId; @@ -44,8 +44,8 @@ StatusOr> RemoveTagProcessor::getTagKeys(GraphSpaceID i std::unique_ptr iter; ret = kvstore_->range(kDefaultSpaceId_, kDefaultPartId_, - MetaUtils::schemaTagKey(id, tagId, MIN_VERSION_HEX), - MetaUtils::schemaTagKey(id, tagId, MAX_VERSION_HEX), + MetaServiceUtils::schemaTagKey(id, tagId, MIN_VERSION_HEX), + MetaServiceUtils::schemaTagKey(id, tagId, MAX_VERSION_HEX), &iter); if (ret != kvstore::ResultCode::SUCCEEDED) { return Status::Error("Tag get error by id : %d !", tagId); diff --git a/src/meta/processors/ScanProcessor.cpp b/src/meta/processors/ScanProcessor.cpp new file mode 100644 index 00000000000..6e897d55a4c --- /dev/null +++ b/src/meta/processors/ScanProcessor.cpp @@ -0,0 +1,28 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "meta/processors/ScanProcessor.h" + +namespace nebula { +namespace meta { + +void ScanProcessor::process(const cpp2::ScanReq& req) { + CHECK_SEGMENT(req.get_segment()); + auto start = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_start()); + auto end = MetaServiceUtils::assembleSegmentKey(req.get_segment(), req.get_end()); + auto result = doScan(start, end); + if (!result.ok()) { + resp_.set_code(cpp2::ErrorCode::E_STORE_FAILURE); + onFinished(); + return; + } + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + resp_.set_values(std::move(result.value())); + onFinished(); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/ScanProcessor.h b/src/meta/processors/ScanProcessor.h new file mode 100644 index 00000000000..bfbcb046d71 --- /dev/null +++ b/src/meta/processors/ScanProcessor.h @@ -0,0 +1,32 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_SCANPROCESSOR_H_ +#define META_SCANPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { + +class ScanProcessor : public BaseProcessor { +public: + static ScanProcessor* instance(kvstore::KVStore* kvstore) { + return new ScanProcessor(kvstore); + } + + void process(const cpp2::ScanReq& req); + +private: + explicit ScanProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; + +} // namespace meta +} // namespace nebula + +#endif // META_SCANPROCESSOR_H_ + diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 44612368f54..addacadbf47 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -21,8 +21,8 @@ nebula_add_test(file_based_schema_manager_test) add_executable( meta_utils_test - MetaUtilsTest.cpp - ../MetaUtils.cpp + MetaServiceUtilsTest.cpp + ../MetaServiceUtils.cpp $ $ $ diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index f302b91fc33..043e6495aeb 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -10,7 +10,7 @@ #include "meta/client/MetaClient.h" #include "meta/test/TestUtils.h" #include "network/NetworkUtils.h" -#include "meta/MetaUtils.h" +#include "meta/MetaServiceUtils.h" DECLARE_int32(load_data_interval_second); @@ -90,6 +90,67 @@ TEST(MetaClientTest, InterfacesTest) { ASSERT_FALSE(ret.ok()); ASSERT_EQ(Status::SpaceNotFound(), ret.status()); } + { + // Multi Put Test + std::vector> pairs; + for (auto i = 0; i < 10; i++) { + pairs.emplace_back(folly::stringPrintf("key_%d", i), + folly::stringPrintf("value_%d", i)); + } + auto ret = client->multiPut("test", pairs).get(); + ASSERT_TRUE(ret.ok()); + } + { + // Get Test + auto ret = client->get("test", "key_0").get(); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ("value_0", ret.value()); + + auto missedRet = client->get("test", "missed_key").get(); + ASSERT_FALSE(missedRet.ok()); + + auto emptyRet = client->get("test", "").get(); + ASSERT_FALSE(emptyRet.ok()); + } + { + // Multi Get Test + std::vector keys; + for (auto i = 0; i < 2; i++) { + keys.emplace_back(folly::stringPrintf("key_%d", i)); + } + auto ret = client->multiGet("test", keys).get(); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ(2, ret.value().size()); + ASSERT_EQ("value_0", ret.value()[0]); + ASSERT_EQ("value_1", ret.value()[1]); + + std::vector emptyKeys; + auto emptyRet = client->multiGet("test", emptyKeys).get(); + ASSERT_FALSE(emptyRet.ok()); + } + { + // Scan Test + auto ret = client->scan("test", "key_0", "key_3").get(); + ASSERT_TRUE(ret.ok()); + ASSERT_EQ(3, ret.value().size()); + ASSERT_EQ("value_0", ret.value()[0]); + ASSERT_EQ("value_1", ret.value()[1]); + ASSERT_EQ("value_2", ret.value()[2]); + } + { + // Remove Test + auto ret = client->remove("test", "key_9").get(); + ASSERT_TRUE(ret.ok()); + } + { + // Remove Range Test + auto ret = client->removeRange("test", "key_0", "key_4").get(); + ASSERT_TRUE(ret.ok()); + } + { + auto ret = client->remove("_test_", "key_8").get(); + ASSERT_FALSE(ret.ok()); + } { auto ret = client->dropSpace("default_space").get(); ASSERT_TRUE(ret.ok()); diff --git a/src/meta/test/MetaUtilsTest.cpp b/src/meta/test/MetaServiceUtilsTest.cpp similarity index 75% rename from src/meta/test/MetaUtilsTest.cpp rename to src/meta/test/MetaServiceUtilsTest.cpp index 66f6c3c7354..7e40de2eb2f 100644 --- a/src/meta/test/MetaUtilsTest.cpp +++ b/src/meta/test/MetaServiceUtilsTest.cpp @@ -9,25 +9,25 @@ #include #include #include "fs/TempFile.h" -#include "meta/MetaUtils.h" +#include "meta/MetaServiceUtils.h" namespace nebula { namespace meta { -TEST(MetaUtilsTest, SpaceKeyTest) { - auto prefix = MetaUtils::spacePrefix(); +TEST(MetaServiceUtilsTest, SpaceKeyTest) { + auto prefix = MetaServiceUtils::spacePrefix(); ASSERT_EQ("__spaces__", prefix); - auto spaceKey = MetaUtils::spaceKey(101); - ASSERT_EQ(101, MetaUtils::spaceId(spaceKey)); - auto spaceVal = MetaUtils::spaceVal(100, 3, "default"); - ASSERT_EQ("default", MetaUtils::spaceName(spaceVal)); + auto spaceKey = MetaServiceUtils::spaceKey(101); + ASSERT_EQ(101, MetaServiceUtils::spaceId(spaceKey)); + auto spaceVal = MetaServiceUtils::spaceVal(100, 3, "default"); + ASSERT_EQ("default", MetaServiceUtils::spaceName(spaceVal)); ASSERT_EQ(100, *reinterpret_cast(spaceVal.c_str())); ASSERT_EQ(3, *reinterpret_cast(spaceVal.c_str() + sizeof(int32_t))); } -TEST(MetaUtilsTest, PartKeyTest) { - auto partKey = MetaUtils::partKey(0, 1); - auto prefix = MetaUtils::partPrefix(0); +TEST(MetaServiceUtilsTest, PartKeyTest) { + auto partKey = MetaServiceUtils::partKey(0, 1); + auto prefix = MetaServiceUtils::partPrefix(0); ASSERT_EQ("__parts__", prefix.substr(0, prefix.size() - sizeof(GraphSpaceID))); ASSERT_EQ(0, *reinterpret_cast( prefix.c_str() + prefix.size() - sizeof(GraphSpaceID))); @@ -41,9 +41,9 @@ TEST(MetaUtilsTest, PartKeyTest) { host.set_port(i * 20 + 2); hosts.emplace_back(std::move(host)); } - auto partVal = MetaUtils::partVal(hosts); + auto partVal = MetaServiceUtils::partVal(hosts); ASSERT_EQ(10 * sizeof(int32_t) * 2, partVal.size()); - auto result = MetaUtils::parsePartVal(partVal); + auto result = MetaServiceUtils::parsePartVal(partVal); ASSERT_EQ(hosts.size(), result.size()); for (int i = 0; i < 10; i++) { ASSERT_EQ(i * 20 + 1, result[i].get_ip()); @@ -51,20 +51,20 @@ TEST(MetaUtilsTest, PartKeyTest) { } } -TEST(MetaUtilsTest, HostKeyTest) { - auto hostKey = MetaUtils::hostKey(10, 11); - const auto& prefix = MetaUtils::hostPrefix(); +TEST(MetaServiceUtilsTest, HostKeyTest) { + auto hostKey = MetaServiceUtils::hostKey(10, 11); + const auto& prefix = MetaServiceUtils::hostPrefix(); ASSERT_EQ("__hosts__", prefix); ASSERT_EQ(prefix, hostKey.substr(0, hostKey.size() - 2 * sizeof(int32_t))); ASSERT_EQ(10, *reinterpret_cast(hostKey.c_str() + prefix.size())); ASSERT_EQ(11, *reinterpret_cast(hostKey.c_str() + prefix.size() + sizeof(IPv4))); - auto addr = MetaUtils::parseHostKey(hostKey); + auto addr = MetaServiceUtils::parseHostKey(hostKey); ASSERT_EQ(10, addr.get_ip()); ASSERT_EQ(11, addr.get_port()); } -TEST(MetaUtilsTest, TagTest) { +TEST(MetaServiceUtilsTest, TagTest) { cpp2::Schema schema; decltype(schema.columns) cols; for (auto i = 1; i <= 3; i++) { @@ -92,8 +92,8 @@ TEST(MetaUtilsTest, TagTest) { cols.emplace_back(std::move(column)); } schema.set_columns(std::move(cols)); - auto val = MetaUtils::schemaTagVal("test_tag", schema); - auto parsedSchema = MetaUtils::parseSchema(val); + auto val = MetaServiceUtils::schemaTagVal("test_tag", schema); + auto parsedSchema = MetaServiceUtils::parseSchema(val); ASSERT_EQ(parsedSchema, schema); } diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index 4f94826e188..0da1663285f 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -20,6 +20,12 @@ #include "meta/processors/RemoveTagProcessor.h" #include "meta/processors/GetTagProcessor.h" #include "meta/processors/ListTagsProcessor.h" +#include "meta/processors/MultiPutProcessor.h" +#include "meta/processors/GetProcessor.h" +#include "meta/processors/MultiGetProcessor.h" +#include "meta/processors/RemoveProcessor.h" +#include "meta/processors/RemoveRangeProcessor.h" +#include "meta/processors/ScanProcessor.h" namespace nebula { namespace meta { @@ -216,6 +222,143 @@ TEST(ProcessorTest, AddTagsTest) { } } +TEST(ProcessorTest, KVOperationTest) { + fs::TempDir rootPath("/tmp/KVOperationTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto hostsNum = TestUtils::createSomeHosts(kv.get()); + UNUSED(hostsNum); + + { + cpp2::CreateSpaceReq req; + req.set_space_name("default_space"); + req.set_parts_num(9); + req.set_replica_factor(3); + + auto* processor = CreateSpaceProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(1, resp.get_id().get_space_id()); + } + { + // Multi Put Test + std::vector pairs; + for (auto i = 0; i < 10; i++) { + pairs.emplace_back(apache::thrift::FragileConstructor::FRAGILE, + folly::stringPrintf("key_%d", i), + folly::stringPrintf("value_%d", i)); + } + + cpp2::MultiPutReq req; + req.set_segment("test"); + req.set_pairs(std::move(pairs)); + + auto* processor = MultiPutProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + } + { + // Get Test + cpp2::GetReq req; + req.set_segment("test"); + req.set_key("key_0"); + + auto* processor = GetProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ("value_0", resp.value); + + cpp2::GetReq missedReq; + missedReq.set_segment("test"); + missedReq.set_key("missed_key"); + + auto* missedProcessor = GetProcessor::instance(kv.get()); + auto missedFuture = missedProcessor->getFuture(); + missedProcessor->process(missedReq); + auto missedResp = std::move(missedFuture).get(); + ASSERT_EQ(cpp2::ErrorCode::E_KEY_NOT_FOUND, missedResp.code); + } + { + // Multi Get Test + std::vector keys; + for (auto i = 0; i < 2; i++) { + keys.emplace_back(std::move(folly::stringPrintf("key_%d", i))); + } + + cpp2::MultiGetReq req; + req.set_segment("test"); + req.set_keys(std::move(keys)); + + auto* processor = MultiGetProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(2, resp.values.size()); + ASSERT_EQ("value_0", resp.values[0]); + ASSERT_EQ("value_1", resp.values[1]); + } + { + // Scan Test + cpp2::ScanReq req; + req.set_segment("test"); + req.set_start("key_1"); + req.set_end("key_4"); + + auto* processor = ScanProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + ASSERT_EQ(3, resp.values.size()); + ASSERT_EQ("value_1", resp.values[0]); + ASSERT_EQ("value_2", resp.values[1]); + ASSERT_EQ("value_3", resp.values[2]); + } + { + // Remove Test + cpp2::RemoveReq req; + req.set_segment("test"); + req.set_key("key_9"); + + auto* processor = RemoveProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + } + { + // Remove Range Test + cpp2::RemoveRangeReq req; + req.set_segment("test"); + req.set_start("key_0"); + req.set_end("key_4"); + + auto* processor = RemoveRangeProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); + } + { + // Illegal Segment Test + cpp2::GetReq req; + req.set_segment("_test0_"); + req.set_key("key_8"); + + auto* processor = GetProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(cpp2::ErrorCode::E_STORE_SEGMENT_ILLEGAL, resp.code); + } +} + TEST(ProcessorTest, ListOrGetTagsTest) { fs::TempDir rootPath("/tmp/ListTagsTest.XXXXXX"); std::unique_ptr kv(TestUtils::initKV(rootPath.path())); @@ -287,7 +430,7 @@ TEST(ProcessorTest, RemoveTagTest) { std::string tagVal; kvstore::ResultCode ret; std::unique_ptr iter; - ret = kv.get()->get(0, 0, std::move(MetaUtils::indexKey(EntryType::TAG, "tag_1")), + ret = kv.get()->get(0, 0, std::move(MetaServiceUtils::indexKey(EntryType::TAG, "tag_1")), &tagVal); ASSERT_EQ(kvstore::ResultCode::ERR_KEY_NOT_FOUND, ret); ret = kv.get()->prefix(0, 0, "__tags__", &iter); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index be8aff740e1..a1205ad18e7 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -70,7 +70,7 @@ class TestUtils { auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); - EXPECT_EQ(resp.code, cpp2::ErrorCode::SUCCEEDED); + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code); } { cpp2::ListHostsReq req; @@ -90,7 +90,7 @@ class TestUtils { static bool assembleSpace(kvstore::KVStore* kv, GraphSpaceID id) { bool ret = false; std::vector data; - data.emplace_back(MetaUtils::spaceKey(id), "test_space"); + data.emplace_back(MetaServiceUtils::spaceKey(id), "test_space"); kv->asyncMultiPut(0, 0, std::move(data), [&] (kvstore::ResultCode code, HostAddr leader) { ret = (code == kvstore::ResultCode::SUCCEEDED); @@ -113,9 +113,9 @@ class TestUtils { } auto tagName = folly::stringPrintf("tag_%d", tagId); auto tagIdVal = std::string(reinterpret_cast(&tagId), sizeof(tagId)); - tags.emplace_back(MetaUtils::indexKey(EntryType::TAG, tagName), tagIdVal); - tags.emplace_back(MetaUtils::schemaTagKey(1, tagId, ver++), - MetaUtils::schemaTagVal(tagName, srcsch)); + tags.emplace_back(MetaServiceUtils::indexKey(EntryType::TAG, tagName), tagIdVal); + tags.emplace_back(MetaServiceUtils::schemaTagKey(1, tagId, ver++), + MetaServiceUtils::schemaTagVal(tagName, srcsch)); } kv->asyncMultiPut(0, 0, std::move(tags), diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 0d3242b0c88..0b367633d7a 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -32,9 +32,9 @@ add_executable( $ $ $ - $ $ $ + $ $ $ $ @@ -85,9 +85,9 @@ add_executable( $ $ $ - $ $ $ + $ $ $ $ @@ -218,9 +218,9 @@ add_executable( StorageClientTest.cpp $ $ - $ $ $ + $ $ $ $ diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index 3d8f5914daf..778ab65de6b 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -2,10 +2,10 @@ add_executable( storage_perf StoragePerfTool.cpp $ - $ $ - $ $ + $ + $ $ $ $