From e4a3d093c94527cad69d511a1ffcf5fe79f157a2 Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Sun, 13 Jan 2019 14:24:22 +0800 Subject: [PATCH 1/3] Storage service processor implementation. (#56) --- src/common/network/NetworkUtils.cpp | 2 +- src/common/network/NetworkUtils.h | 1 - src/common/network/test/NetworkUtilsTest.cpp | 1 - src/daemons/CMakeLists.txt | 4 + src/daemons/StorageDaemon.cpp | 47 ++++- src/dataman/ResultSchemaProvider.cpp | 2 +- src/dataman/ResultSchemaProvider.h | 2 +- src/dataman/RowSetWriter.cpp | 1 + src/dataman/RowWriter.cpp | 1 - src/interface/storage.thrift | 53 +++-- src/meta/CMakeLists.txt | 2 +- src/meta/SchemaManager.cpp | 20 ++ src/meta/SchemaManager.h | 84 ++++++++ src/storage/AddEdgesProcessor.cpp | 32 +++ src/storage/AddEdgesProcessor.h | 15 +- src/storage/AddVerticesProcessor.cpp | 26 +-- src/storage/AddVerticesProcessor.h | 17 +- src/storage/BaseProcessor.h | 49 +++-- src/storage/BaseProcessor.inl | 61 ++++++ src/storage/CMakeLists.txt | 8 +- src/storage/Collector.h | 135 +++++++++++++ src/storage/KeyUtils.cpp | 18 +- src/storage/KeyUtils.h | 9 +- src/storage/QueryBaseProcessor.h | 66 ++++++ src/storage/QueryBaseProcessor.inl | 183 +++++++++++++++++ src/storage/QueryBoundProcessor.cpp | 123 ++++++++++++ src/storage/QueryBoundProcessor.h | 56 ++++++ src/storage/QueryEdgePropsProcessor.cpp | 86 ++++++++ src/storage/QueryEdgePropsProcessor.h | 59 ++++++ src/storage/QueryProcessor.h | 86 -------- src/storage/QueryStatsProcessor.cpp | 140 +++++++++++++ src/storage/QueryStatsProcessor.h | 55 +++++ src/storage/QueryVertexPropsProcessor.cpp | 31 +++ src/storage/QueryVertexPropsProcessor.h | 34 ++++ src/storage/StorageServiceHandler.cpp | 27 +-- src/storage/StorageServiceHandler.h | 17 +- src/storage/test/AddEdgesTest.cpp | 78 +++++++ src/storage/test/AddVerticesTest.cpp | 7 +- src/storage/test/CMakeLists.txt | 190 +++++++++++++++++- src/storage/test/QueryBoundTest.cpp | 176 ++++++++++++++++ src/storage/test/QueryEdgePropsTest.cpp | 133 ++++++++++++ src/storage/test/QueryStatsTest.cpp | 173 ++++++++++++++++ src/storage/test/QueryVertexPropsTest.cpp | 115 +++++++++++ .../test/StorageServiceHandlerTest.cpp | 7 +- src/storage/test/StorageTestBase.cpp | 14 -- src/storage/test/StorageTestBase.h | 67 ------ src/storage/test/TestUtils.h | 125 ++++++++++++ 47 files changed, 2344 insertions(+), 294 deletions(-) create mode 100644 src/storage/AddEdgesProcessor.cpp create mode 100644 src/storage/BaseProcessor.inl create mode 100644 src/storage/Collector.h create mode 100644 src/storage/QueryBaseProcessor.h create mode 100644 src/storage/QueryBaseProcessor.inl create mode 100644 src/storage/QueryBoundProcessor.cpp create mode 100644 src/storage/QueryBoundProcessor.h create mode 100644 src/storage/QueryEdgePropsProcessor.cpp create mode 100644 src/storage/QueryEdgePropsProcessor.h delete mode 100644 src/storage/QueryProcessor.h create mode 100644 src/storage/QueryStatsProcessor.cpp create mode 100644 src/storage/QueryStatsProcessor.h create mode 100644 src/storage/QueryVertexPropsProcessor.cpp create mode 100644 src/storage/QueryVertexPropsProcessor.h create mode 100644 src/storage/test/AddEdgesTest.cpp create mode 100644 src/storage/test/QueryBoundTest.cpp create mode 100644 src/storage/test/QueryEdgePropsTest.cpp create mode 100644 src/storage/test/QueryStatsTest.cpp create mode 100644 src/storage/test/QueryVertexPropsTest.cpp delete mode 100644 src/storage/test/StorageTestBase.cpp delete mode 100644 src/storage/test/StorageTestBase.h create mode 100644 src/storage/test/TestUtils.h diff --git a/src/common/network/NetworkUtils.cpp b/src/common/network/NetworkUtils.cpp index 8e101f14136..c6bb5a82204 100644 --- a/src/common/network/NetworkUtils.cpp +++ b/src/common/network/NetworkUtils.cpp @@ -11,6 +11,7 @@ #include #include + namespace nebula { namespace network { @@ -82,7 +83,6 @@ StatusOr> NetworkUtils::listDeviceA return dev2ipv4s; } - bool NetworkUtils::getDynamicPortRange(uint16_t& low, uint16_t& high) { FILE* pipe = popen("cat /proc/sys/net/ipv4/ip_local_port_range", "r"); if (!pipe) { diff --git a/src/common/network/NetworkUtils.h b/src/common/network/NetworkUtils.h index b546085c90f..a6ab71b8f3f 100644 --- a/src/common/network/NetworkUtils.h +++ b/src/common/network/NetworkUtils.h @@ -26,7 +26,6 @@ class NetworkUtils final { static StatusOr> listIPv4s(); // List out all network devices and its cooresponding Ipv4 address. static StatusOr> listDeviceAndIPv4s(); - // Get the local dynamic port range [low, high], only works for IPv4 static bool getDynamicPortRange(uint16_t& low, uint16_t& high); // Get all ports that are currently in use diff --git a/src/common/network/test/NetworkUtilsTest.cpp b/src/common/network/test/NetworkUtilsTest.cpp index 6e3a055766d..db196874df4 100644 --- a/src/common/network/test/NetworkUtilsTest.cpp +++ b/src/common/network/test/NetworkUtilsTest.cpp @@ -62,7 +62,6 @@ TEST(NetworkUtils, listDeviceAndIPv4s) { ASSERT_NE(result.value().end(), result.value().find("lo")); } - TEST(NetworkUtils, intIPv4Conversion) { uint32_t ip; ASSERT_TRUE(NetworkUtils::ipv4ToInt("127.0.0.1", ip)); diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 62d99db1928..20147da6f2d 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -41,10 +41,14 @@ add_executable( $ $ $ + $ + $ $ $ $ $ + $ + $ ) target_link_libraries( storaged diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index c91637f4709..e0517ea2b24 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -6,19 +6,62 @@ #include "base/Base.h" #include +#include "network/NetworkUtils.h" #include "storage/StorageServiceHandler.h" +#include "kvstore/include/KVStore.h" +#include "meta/SchemaManager.h" DEFINE_int32(port, 44500, "Storage daemon listening port"); +DEFINE_string(data_path, "", "Root data path, multi paths should be split by comma." + "For rocksdb engine, one path one instance."); +DEFINE_string(local_ip, "", "Local ip"); +// Get local IPv4 address. You could specify it by set FLAGS_local_ip, otherwise +// it will use the first ip exclude "127.0.0.1" +namespace nebula { + +StatusOr getLocalIP() { + if (!FLAGS_local_ip.empty()) { + return FLAGS_local_ip; + } + auto result = network::NetworkUtils::listDeviceAndIPv4s(); + if (!result.ok()) { + return std::move(result).status(); + } + for (auto& deviceIP : result.value()) { + if (deviceIP.second != "127.0.0.1") { + return deviceIP.first; + } + } + return Status::Error("No IPv4 address found!"); +} + +} // namespace nebula int main(int argc, char *argv[]) { folly::init(&argc, &argv, true); + using namespace nebula; using namespace nebula::storage; - LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port; + LOG(INFO) << "Starting the storage Daemon on port " << FLAGS_port + << ", dataPath " << FLAGS_data_path; + + std::vector paths; + folly::split(",", FLAGS_data_path, paths, true); + std::transform(paths.begin(), paths.end(), paths.begin(), [](auto& p) { + return folly::trimWhitespace(p).str(); + }); + auto result = getLocalIP(); + CHECK(result.ok()) << result.status(); + uint32_t localIP; + CHECK(network::NetworkUtils::ipv4ToInt(result.value(), localIP)); + + std::unique_ptr kvstore( + kvstore::KVStore::instance(HostAddr(localIP, FLAGS_port), std::move(paths))); + std::unique_ptr schemaMan(meta::SchemaManager::instance()); - auto handler = std::make_shared(); + auto handler = std::make_shared(kvstore.get(), schemaMan.get()); auto server = std::make_shared(); CHECK(!!server) << "Failed to create the thrift server"; diff --git a/src/dataman/ResultSchemaProvider.cpp b/src/dataman/ResultSchemaProvider.cpp index e38ca6da8a5..cddf3fb0e7e 100644 --- a/src/dataman/ResultSchemaProvider.cpp +++ b/src/dataman/ResultSchemaProvider.cpp @@ -45,7 +45,7 @@ bool ResultSchemaProvider::ResultSchemaField::isValid() const { * ResultSchemaProvider * **********************************/ -ResultSchemaProvider::ResultSchemaProvider(cpp2::Schema&& schema) +ResultSchemaProvider::ResultSchemaProvider(cpp2::Schema schema) : columns_(std::move(schema.get_columns())) { for (auto i = 0UL; i< columns_.size(); i++) { const std::string& name = columns_[i].get_name(); diff --git a/src/dataman/ResultSchemaProvider.h b/src/dataman/ResultSchemaProvider.h index 4219336ca3b..7fcb0b6a159 100644 --- a/src/dataman/ResultSchemaProvider.h +++ b/src/dataman/ResultSchemaProvider.h @@ -31,7 +31,7 @@ class ResultSchemaProvider : public SchemaProviderIf { public: - explicit ResultSchemaProvider(storage::cpp2::Schema&&); + explicit ResultSchemaProvider(storage::cpp2::Schema); virtual ~ResultSchemaProvider() = default; int32_t getLatestVer() const noexcept override; diff --git a/src/dataman/RowSetWriter.cpp b/src/dataman/RowSetWriter.cpp index 6cb3819bc1b..82a25b7b48f 100644 --- a/src/dataman/RowSetWriter.cpp +++ b/src/dataman/RowSetWriter.cpp @@ -17,6 +17,7 @@ RowSetWriter::RowSetWriter(const SchemaProviderIf* schema, void RowSetWriter::writeRowLength(int64_t len) { + VLOG(3) << "Write row length " << len; uint8_t buf[10]; size_t lenBytes = folly::encodeVarint(len, buf); DCHECK_GT(lenBytes, 0UL); diff --git a/src/dataman/RowWriter.cpp b/src/dataman/RowWriter.cpp index 0a96a46f969..745bed54705 100644 --- a/src/dataman/RowWriter.cpp +++ b/src/dataman/RowWriter.cpp @@ -163,7 +163,6 @@ RowWriter& RowWriter::operator<<(const std::string& v) noexcept { return operator<<(folly::StringPiece(v)); } - RowWriter& RowWriter::operator<<(folly::StringPiece v) noexcept { RW_GET_COLUMN_TYPE(STRING) diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index a74682caa0e..28345534f8c 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -29,6 +29,11 @@ enum ErrorCode { E_SPACE_NOT_FOUND = -13, E_PART_NOT_FOUND = -14, + // meta failures + E_EDGE_PROP_NOT_FOUND = -21, + E_TAG_PROP_NOT_FOUND = -22, + E_IMPROPER_DATA_TYPE = -23, + E_UNKNOWN = -100, } (cpp.enum_strict) @@ -85,7 +90,8 @@ enum PropOwner { struct PropDef { 1: PropOwner owner, 2: i32 tag_id, // Only valid when owner is SOURCE or DEST - 3: string name, // Property name + 3: string name, // Property name + 4: StatType stat, // calc stats when setted. } enum StatType { @@ -94,11 +100,6 @@ enum StatType { AVG = 3, } (cpp.enum_strict) -struct PropStat { - 1: StatType stat, - 2: PropDef prop, -} - struct HostAddr { 1: IPv4 ip, 2: Port port, @@ -130,6 +131,22 @@ struct ExecResponse { 2: required i32 latency_in_ms, // Execution latency } +struct EdgePropResponse { + 1: required list codes, + 2: required i32 latency_in_ms, // Query latency from storage service + 3: optional Schema schema, // edge related props + 4: optional binary data, +} + +struct QueryStatsResponse { + // If error for the whole request, the codes would have only one element and + // related part_id equals -1 + 1: required list codes, + 2: required i32 latency_in_ms, // Query latency from storage service + 3: optional Schema schema, + 4: optional binary data, +} + struct Tag { 1: i32 tag_id, 2: binary props, @@ -142,9 +159,10 @@ struct Vertex { struct EdgeKey { 1: i64 src, - 2: i64 dst, // When edge_type > 0, it's an out-edge, otherwise, it's an in-edge - 3: i32 edge_type, + // When query edge props, the field could be unset. + 2: i32 edge_type, + 3: i64 dst, 4: i64 ranking, } @@ -163,16 +181,6 @@ struct GetNeighborsRequest { 5: list return_columns, } -struct NeighborsStatsRequest { - 1: GraphSpaceID space_id, - // partId => ids - 2: map>(cpp.template = "std::unordered_map") ids, - // When edge_type > 0, going along the out-edge, otherwise, along the in-edge - 3: i32 edge_type, - 4: binary filter, - 5: list return_columns, -} - struct VertexPropRequest { 1: GraphSpaceID space_id, 2: map>(cpp.template = "std::unordered_map") ids, @@ -183,7 +191,8 @@ struct EdgePropRequest { 1: GraphSpaceID space_id, // partId => edges 2: map>(cpp.template = "std::unordered_map") edges, - 3: list return_columns, + 3: i32 edge_type, + 4: list return_columns, } struct AddVerticesRequest { @@ -206,12 +215,12 @@ service StorageService { QueryResponse getOutBound(1: GetNeighborsRequest req) QueryResponse getInBound(1: GetNeighborsRequest req) - QueryResponse outBoundStats(1: NeighborsStatsRequest req) - QueryResponse inBoundStats(1: NeighborsStatsRequest req) + QueryStatsResponse outBoundStats(1: GetNeighborsRequest req) + QueryStatsResponse inBoundStats(1: GetNeighborsRequest req) // When return_columns is empty, return all properties QueryResponse getProps(1: VertexPropRequest req); - QueryResponse getEdgeProps(1: EdgePropRequest req) + EdgePropResponse getEdgeProps(1: EdgePropRequest req) ExecResponse addVertices(1: AddVerticesRequest req); ExecResponse addEdges(1: AddEdgesRequest req); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 173cf980492..0ac1abcba13 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -4,7 +4,7 @@ add_library( HostManager.cpp SchemaManager.cpp ) -add_dependencies(meta_obj common) +add_dependencies(meta_obj common dataman_obj) #add_subdirectory(test) diff --git a/src/meta/SchemaManager.cpp b/src/meta/SchemaManager.cpp index e69de29bb2d..0c832412e60 100644 --- a/src/meta/SchemaManager.cpp +++ b/src/meta/SchemaManager.cpp @@ -0,0 +1,20 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "meta/SchemaManager.h" + +namespace nebula { +namespace meta { + +// static +SchemaManager* SchemaManager::instance() { + return new MemorySchemaManager(); +} + + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/SchemaManager.h b/src/meta/SchemaManager.h index e69de29bb2d..21d7e08b2e8 100644 --- a/src/meta/SchemaManager.h +++ b/src/meta/SchemaManager.h @@ -0,0 +1,84 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_SCHEMAMANAGER_H_ +#define META_SCHEMAMANAGER_H_ + +#include "base/Base.h" +#include "dataman/SchemaProviderIf.h" + +namespace nebula { +namespace meta { + +class SchemaManager { +public: + static SchemaManager* instance(); + + virtual SchemaProviderIf* edgeSchema(GraphSpaceID spaceId, EdgeType edgeType) = 0; + + virtual SchemaProviderIf* tagSchema(GraphSpaceID spaceId, TagID tagId) = 0; + + virtual int32_t version() = 0; + + virtual ~SchemaManager() = default; + +protected: + SchemaManager() = default; +}; + +class MemorySchemaManager : public SchemaManager { +public: + using EdgeSchema + = std::unordered_map>; + + using TagSchema + = std::unordered_map>; + + SchemaProviderIf* edgeSchema(GraphSpaceID spaceId, EdgeType edgeType) override { + auto it = edgeSchema_.find(spaceId); + if (it != edgeSchema_.end()) { + auto edgeIt = it->second.find(edgeType); + if (edgeIt != it->second.end()) { + return edgeIt->second; + } + } + return nullptr; + } + + SchemaProviderIf* tagSchema(GraphSpaceID spaceId, TagID tagId) override { + auto it = tagSchema_.find(spaceId); + if (it != tagSchema_.end()) { + auto tagIt = it->second.find(tagId); + if (tagIt != it->second.end()) { + return tagIt->second; + } + } + return nullptr; + } + + int32_t version() { + return 0; + } + + EdgeSchema& edgeSchema() { + return edgeSchema_; + } + + TagSchema& tagSchema() { + return tagSchema_; + } + +private: + EdgeSchema edgeSchema_; + + TagSchema tagSchema_; +}; + +} // namespace meta +} // namespace nebula +#endif // META_SCHEMAMANAGER_H_ + + diff --git a/src/storage/AddEdgesProcessor.cpp b/src/storage/AddEdgesProcessor.cpp new file mode 100644 index 00000000000..24be325d30b --- /dev/null +++ b/src/storage/AddEdgesProcessor.cpp @@ -0,0 +1,32 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "storage/AddEdgesProcessor.h" +#include +#include "time/TimeUtils.h" +#include "storage/KeyUtils.h" + +namespace nebula { +namespace storage { + +void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { + auto spaceId = req.get_space_id(); + auto now = time::TimeUtils::nowInMSeconds(); + callingNum_ = req.edges.size(); + CHECK_NOTNULL(kvstore_); + std::for_each(req.edges.begin(), req.edges.end(), [&](auto& partEdges){ + auto partId = partEdges.first; + std::vector data; + std::for_each(partEdges.second.begin(), partEdges.second.end(), [&](auto& edge){ + auto key = KeyUtils::edgeKey(partId, edge.key.src, edge.key.edge_type, + edge.key.dst, edge.key.ranking, now); + data.emplace_back(std::move(key), std::move(edge.get_props())); + }); + doPut(spaceId, partId, std::move(data)); + }); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/AddEdgesProcessor.h b/src/storage/AddEdgesProcessor.h index d220366725e..7906e645b77 100644 --- a/src/storage/AddEdgesProcessor.h +++ b/src/storage/AddEdgesProcessor.h @@ -8,31 +8,24 @@ #define STORAGE_ADDEDGESPROCESSOR_H_ #include "base/Base.h" -#include -#include -#include -#include "kvstore/include/KVStore.h" -#include "interface/gen-cpp2/storage_types.h" #include "storage/BaseProcessor.h" namespace nebula { namespace storage { -class AddEdgesProcessor : public BaseProcessor { +class AddEdgesProcessor : public BaseProcessor { public: static AddEdgesProcessor* instance(kvstore::KVStore* kvstore) { return new AddEdgesProcessor(kvstore); } - void process(const cpp2::AddEdgesRequest& req) override { - VLOG(1) << req.get_space_id(); - } + void process(const cpp2::AddEdgesRequest& req); private: AddEdgesProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} -}; + : BaseProcessor(kvstore) {} +}; } // namespace storage } // namespace nebula diff --git a/src/storage/AddVerticesProcessor.cpp b/src/storage/AddVerticesProcessor.cpp index 27875962b86..a0bd812229e 100644 --- a/src/storage/AddVerticesProcessor.cpp +++ b/src/storage/AddVerticesProcessor.cpp @@ -6,7 +6,6 @@ #include "storage/AddVerticesProcessor.h" #include -#include "time/Duration.h" #include "time/TimeUtils.h" #include "storage/KeyUtils.h" @@ -14,10 +13,11 @@ namespace nebula { namespace storage { void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { - auto now = startMs_ = time::TimeUtils::nowInMSeconds(); + auto now = time::TimeUtils::nowInMSeconds(); const auto& partVertices = req.get_vertices(); auto spaceId = req.get_space_id(); callingNum_ = partVertices.size(); + CHECK_NOTNULL(kvstore_); std::for_each(partVertices.begin(), partVertices.end(), [&](auto& pv) { auto partId = pv.first; const auto& vertices = pv.second; @@ -30,26 +30,8 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { data.emplace_back(std::move(key), std::move(tag.get_props())); }); }); - CHECK_NOTNULL(kvstore_); - kvstore_->asyncMultiPut(spaceId, partId, std::move(data), - [partId, this](kvstore::ResultCode code, HostAddr addr) { - cpp2::ResultCode thriftResult; - thriftResult.code = to(code); - thriftResult.part_id = partId; - if (code == kvstore::ResultCode::ERR_LEADER_CHANAGED) { - thriftResult.get_leader()->ip = addr.first; - thriftResult.get_leader()->port = addr.second; - } - std::lock_guard lg(this->lock_); - this->codes_.emplace_back(std::move(thriftResult)); - this->callingNum_--; - if (this->callingNum_ == 0) { - this->resp_.set_codes(std::move(this->codes_)); - this->resp_.set_latency_in_ms(time::TimeUtils::nowInMSeconds() - this->startMs_); - this->onFinished(); - } - }); - }); + doPut(spaceId, partId, std::move(data)); + }); } } // namespace storage diff --git a/src/storage/AddVerticesProcessor.h b/src/storage/AddVerticesProcessor.h index bc35e7ab306..494c1df1779 100644 --- a/src/storage/AddVerticesProcessor.h +++ b/src/storage/AddVerticesProcessor.h @@ -8,33 +8,22 @@ #define STORAGE_ADDVERTICESPROCESSOR_H_ #include "base/Base.h" -#include -#include -#include -#include "kvstore/include/KVStore.h" -#include "interface/gen-cpp2/storage_types.h" #include "storage/BaseProcessor.h" namespace nebula { namespace storage { -class AddVerticesProcessor : public BaseProcessor { +class AddVerticesProcessor : public BaseProcessor { public: static AddVerticesProcessor* instance(kvstore::KVStore* kvstore) { return new AddVerticesProcessor(kvstore); } - void process(const cpp2::AddVerticesRequest& req) override; + void process(const cpp2::AddVerticesRequest& req); private: AddVerticesProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} - -private: - std::vector codes_; - folly::SpinLock lock_; - int64_t startMs_; - int32_t callingNum_; + : BaseProcessor(kvstore) {} }; diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index c6ae830ba3f..00155dd9005 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -8,14 +8,22 @@ #define STORAGE_BASEPROCESSOR_H_ #include "base/Base.h" +#include #include #include +#include "interface/gen-cpp2/storage_types.h" #include "kvstore/include/KVStore.h" +#include "meta/SchemaManager.h" +#include "dataman/RowSetWriter.h" +#include "dataman/RowReader.h" +#include "dataman/RowWriter.h" +#include "storage/Collector.h" +#include "time/Duration.h" namespace nebula { namespace storage { -template +template class BaseProcessor { public: BaseProcessor(kvstore::KVStore* kvstore) @@ -27,8 +35,6 @@ class BaseProcessor { return promise_.getFuture(); } - virtual void process(const REQ& req) = 0; - protected: /** * Destroy current instance when finished. @@ -38,27 +44,38 @@ class BaseProcessor { delete this; } - cpp2::ErrorCode to(kvstore::ResultCode code) { - switch (code) { - case kvstore::ResultCode::SUCCESSED: - return cpp2::ErrorCode::SUCCEEDED; - case kvstore::ResultCode::ERR_LEADER_CHANAGED: - return cpp2::ErrorCode::E_LEADER_CHANGED; - case kvstore::ResultCode::ERR_SPACE_NOT_FOUND: - return cpp2::ErrorCode::E_SPACE_NOT_FOUND; - case kvstore::ResultCode::ERR_PART_NOT_FOUND: - return cpp2::ErrorCode::E_PART_NOT_FOUND; - default: - return cpp2::ErrorCode::E_UNKNOWN; - } + void doPut(GraphSpaceID spaceId, PartitionID partId, std::vector data); + + cpp2::ColumnDef columnDef(std::string name, cpp2::SupportedType type) { + cpp2::ColumnDef column; + column.name = std::move(name); + column.type.type = type; + return column; + } + + cpp2::ErrorCode to(kvstore::ResultCode code); + + void pushResultCode(cpp2::ErrorCode code, PartitionID partId) { + cpp2::ResultCode thriftRet; + thriftRet.code = code; + thriftRet.part_id = partId; + resp_.codes.emplace_back(std::move(thriftRet)); } protected: kvstore::KVStore* kvstore_ = nullptr; RESP resp_; folly::Promise promise_; + + time::Duration duration_; + std::vector codes_; + folly::SpinLock lock_; + int32_t callingNum_; }; } // namespace storage } // namespace nebula + +#include "storage/BaseProcessor.inl" + #endif // STORAGE_BASEPROCESSOR_H_ diff --git a/src/storage/BaseProcessor.inl b/src/storage/BaseProcessor.inl new file mode 100644 index 00000000000..c9bd0be79d2 --- /dev/null +++ b/src/storage/BaseProcessor.inl @@ -0,0 +1,61 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include "storage/BaseProcessor.h" + +namespace nebula { +namespace storage { + +template +cpp2::ErrorCode BaseProcessor::to(kvstore::ResultCode code) { + switch (code) { + case kvstore::ResultCode::SUCCESSED: + return cpp2::ErrorCode::SUCCEEDED; + case kvstore::ResultCode::ERR_LEADER_CHANAGED: + return cpp2::ErrorCode::E_LEADER_CHANGED; + case kvstore::ResultCode::ERR_SPACE_NOT_FOUND: + return cpp2::ErrorCode::E_SPACE_NOT_FOUND; + case kvstore::ResultCode::ERR_PART_NOT_FOUND: + return cpp2::ErrorCode::E_PART_NOT_FOUND; + default: + return cpp2::ErrorCode::E_UNKNOWN; + } +} + +template +void BaseProcessor::doPut(GraphSpaceID spaceId, + PartitionID partId, + std::vector data) { + this->kvstore_->asyncMultiPut(spaceId, partId, std::move(data), + [partId, this](kvstore::ResultCode code, HostAddr addr) { + cpp2::ResultCode thriftResult; + thriftResult.code = to(code); + thriftResult.part_id = partId; + if (code == kvstore::ResultCode::ERR_LEADER_CHANAGED) { + thriftResult.get_leader()->ip = addr.first; + thriftResult.get_leader()->port = addr.second; + } + bool finished = false; + { + std::lock_guard lg(this->lock_); + this->codes_.emplace_back(std::move(thriftResult)); + this->callingNum_--; + if (this->callingNum_ == 0) { + this->resp_.set_codes(std::move(this->codes_)); + this->resp_.set_latency_in_ms(this->duration_.elapsedInMSec()); + finished = true; + } + } + if (finished) { + this->onFinished(); + } + }); +} + + +} // namespace storage +} // namespace nebula diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 4d952465929..45f109910cc 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -4,9 +4,13 @@ add_library( StorageServiceHandler.cpp KeyUtils.cpp AddVerticesProcessor.cpp + AddEdgesProcessor.cpp + QueryBoundProcessor.cpp + QueryVertexPropsProcessor.cpp + QueryEdgePropsProcessor.cpp + QueryStatsProcessor.cpp ) -add_dependencies(storage_service_handler common storage_thrift_obj kvstore_obj) - +add_dependencies(storage_service_handler common storage_thrift_obj kvstore_obj dataman_obj meta_obj) add_subdirectory(test) diff --git a/src/storage/Collector.h b/src/storage/Collector.h new file mode 100644 index 00000000000..5a5fada1a51 --- /dev/null +++ b/src/storage/Collector.h @@ -0,0 +1,135 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef STORAGE_COLLECTOR_H_ +#define STORAGE_COLLECTOR_H_ + +#include "base/Base.h" +#include "dataman/RowWriter.h" +#include + +namespace nebula { +namespace storage { + +struct PropContext { + cpp2::PropDef prop_; + cpp2::ValueType type_; + boost::variant sum_ = 0L; + int32_t count_ = 0; + int32_t retIndex_ = -1; +}; + +struct TagContext { + TagID tagId_; + SchemaProviderIf* schema_ = nullptr; + std::vector props_; +}; + +struct EdgeContext { + EdgeType edgeType_; + SchemaProviderIf* schema_ = nullptr; + std::vector props_; +}; + +class Collector { +public: + Collector() = default; + + virtual ~Collector() = default; + + virtual void collectInt32(ResultType ret, int32_t v, PropContext& prop) = 0; + + virtual void collectInt64(ResultType ret, int64_t v, PropContext& prop) = 0; + + virtual void collectFloat(ResultType ret, float v, PropContext& prop) = 0; + + virtual void collectDouble(ResultType ret, double v, PropContext& prop) = 0; + + virtual void collectString(ResultType ret, folly::StringPiece& v, PropContext& prop) = 0; +}; + +class PropsCollector : public Collector { +public: + PropsCollector(RowWriter* writer) + : writer_(writer) {} + + void collectInt32(ResultType ret, int32_t v, PropContext& prop) override { + collect(ret, v, prop); + } + + void collectInt64(ResultType ret, int64_t v, PropContext& prop) override { + collect(ret, v, prop); + } + + void collectFloat(ResultType ret, float v, PropContext& prop) override { + collect(ret, v, prop); + } + + void collectDouble(ResultType ret, double v, PropContext& prop) override { + collect(ret, v, prop); + } + + void collectString(ResultType ret, folly::StringPiece& v, PropContext& prop) override { + collect(ret, v, prop); + } + + template + void collect(ResultType ret, V& v, PropContext& prop) { + UNUSED(prop); + if (ResultType::SUCCEEDED == ret) { + (*writer_) << v; + } + } + +private: + RowWriter* writer_ = nullptr; +}; + +class StatsCollector : public Collector { +public: + StatsCollector() = default; + + void collectInt32(ResultType ret, int32_t v, PropContext& prop) override { + if (ret == ResultType::SUCCEEDED) { + prop.sum_ = boost::get(prop.sum_) + v; + prop.count_++; + } + } + + void collectInt64(ResultType ret, int64_t v, PropContext& prop) override { + if (ret == ResultType::SUCCEEDED) { + prop.sum_ = boost::get(prop.sum_) + v; + prop.count_++; + } + } + + void collectFloat(ResultType ret, float v, PropContext& prop) override { + if (ret == ResultType::SUCCEEDED) { + prop.sum_ = boost::get(prop.sum_) + v; + prop.count_++; + } + } + + void collectDouble(ResultType ret, double v, PropContext& prop) override { + if (ret == ResultType::SUCCEEDED) { + prop.sum_ = boost::get(prop.sum_) + v; + prop.count_++; + } + } + + void collectString(ResultType ret, folly::StringPiece& v, PropContext& prop) override { + UNUSED(v); + if (ret == ResultType::SUCCEEDED) { + prop.count_++; + } + } + +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_COLLECTOR_H_ + diff --git a/src/storage/KeyUtils.cpp b/src/storage/KeyUtils.cpp index 578ac2dbc16..6a5c8519503 100644 --- a/src/storage/KeyUtils.cpp +++ b/src/storage/KeyUtils.cpp @@ -23,15 +23,15 @@ std::string KeyUtils::vertexKey(PartitionID partId, VertexID vId, // static std::string KeyUtils::edgeKey(PartitionID partId, VertexID srcId, - VertexID dstId, EdgeType type, + EdgeType type, VertexID dstId, EdgeRanking rank, EdgeVersion ts) { std::string key; key.reserve(kEdgeLen); key.append(reinterpret_cast(&partId), sizeof(PartitionID)) .append(reinterpret_cast(&srcId), sizeof(VertexID)) .append(reinterpret_cast(&type), sizeof(EdgeType)) - .append(reinterpret_cast(&dstId), sizeof(VertexID)) .append(reinterpret_cast(&rank), sizeof(EdgeRanking)) + .append(reinterpret_cast(&dstId), sizeof(VertexID)) .append(reinterpret_cast(&ts), sizeof(EdgeVersion)); return key; } @@ -55,6 +55,20 @@ std::string KeyUtils::prefix(PartitionID partId, VertexID vId) { return key; } +//static +std::string KeyUtils::prefix(PartitionID partId, VertexID src, EdgeType type, + VertexID dst, EdgeRanking ranking) { + std::string key; + key.reserve(sizeof(PartitionID) + sizeof(VertexID) + sizeof(EdgeType) + + sizeof(VertexID) + sizeof(EdgeRanking)); + key.append(reinterpret_cast(&partId), sizeof(PartitionID)) + .append(reinterpret_cast(&src), sizeof(VertexID)) + .append(reinterpret_cast(&type), sizeof(EdgeType)) + .append(reinterpret_cast(&ranking), sizeof(EdgeRanking)) + .append(reinterpret_cast(&dst), sizeof(VertexID)); + return key; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/KeyUtils.h b/src/storage/KeyUtils.h index d75822c1750..e8ad41fa918 100644 --- a/src/storage/KeyUtils.h +++ b/src/storage/KeyUtils.h @@ -42,7 +42,7 @@ class KeyUtils final { * Generate edge key for kv store * */ static std::string edgeKey(PartitionID partId, VertexID srcId, - VertexID dstId, EdgeType type, + EdgeType type, VertexID dstId, EdgeRanking rank, EdgeVersion ts); /** @@ -55,6 +55,9 @@ class KeyUtils final { * */ static std::string prefix(PartitionID partId, VertexID vId); + static std::string prefix(PartitionID partId, VertexID src, EdgeType type, + VertexID dst, EdgeRanking ranking); + static bool isVertex(const std::string& rawKey) { return rawKey.size() == kVertexLen; } @@ -66,12 +69,12 @@ class KeyUtils final { /** * Parse vertex from rawKey, return false if failed. * */ - static bool parseVertex(const std::string& rawKey, Vertex& vertex); +// static bool parseVertex(const std::string& rawKey, Vertex& vertex); /** * Parse edge from rawkey, return false if failed. **/ - static bool parseEdge(const std::string& rawKey, Edge& edge); +// static bool parseEdge(const std::string& rawKey, Edge& edge); private: KeyUtils() = delete; diff --git a/src/storage/QueryBaseProcessor.h b/src/storage/QueryBaseProcessor.h new file mode 100644 index 00000000000..65f8900142c --- /dev/null +++ b/src/storage/QueryBaseProcessor.h @@ -0,0 +1,66 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef STORAGE_QUERYBASEPROCESSOR_H_ +#define STORAGE_QUERYBASEPROCESSOR_H_ + +#include "base/Base.h" +#include "storage/BaseProcessor.h" +#include "storage/Collector.h" + +namespace nebula { +namespace storage { + +template +class QueryBaseProcessor : public BaseProcessor { +public: + virtual ~QueryBaseProcessor() = default; + + void process(const cpp2::GetNeighborsRequest& req); + +protected: + QueryBaseProcessor(kvstore::KVStore* kvstore, meta::SchemaManager* schemaMan) + : BaseProcessor(kvstore) + , schemaMan_(schemaMan) {} + /** + * Check whether current operatin on the data is valid or not. + * */ + bool validOperation(cpp2::SupportedType vType, cpp2::StatType statType); + + /** + * Check request meta is illegal or not. Return contexts for tag and edge. + * */ + cpp2::ErrorCode checkAndBuildContexts(const REQ& req, + std::vector& tagContexts, + EdgeContext& edgeContext); + /** + * collect props in one row, you could define custom behavior by implement your own collector. + * */ + void collectProps(SchemaProviderIf* rowSchema, + folly::StringPiece& key, + folly::StringPiece& val, + std::vector& props, + Collector* collector); + + virtual kvstore::ResultCode processVertex(PartitionID partID, VertexID vId, + std::vector& tagContexts, + EdgeContext& edgeContext) = 0; + + virtual void onProcessed(std::vector& tagContexts, + EdgeContext& edgeContext, + int32_t retNum) = 0; + +protected: + meta::SchemaManager* schemaMan_ = nullptr; + GraphSpaceID spaceId_; +}; + +} // namespace storage +} // namespace nebula + +#include "storage/QueryBaseProcessor.inl" + +#endif // STORAGE_QUERYBASEPROCESSOR_H_ diff --git a/src/storage/QueryBaseProcessor.inl b/src/storage/QueryBaseProcessor.inl new file mode 100644 index 00000000000..7c841670eb7 --- /dev/null +++ b/src/storage/QueryBaseProcessor.inl @@ -0,0 +1,183 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "storage/QueryBaseProcessor.h" +#include +#include "storage/KeyUtils.h" +#include "dataman/RowReader.h" +#include "dataman/RowWriter.h" + + +namespace nebula { +namespace storage { + +template +bool QueryBaseProcessor::validOperation(cpp2::SupportedType vType, + cpp2::StatType statType) { + switch(statType) { + case cpp2::StatType::SUM: + case cpp2::StatType::AVG: { + return vType == cpp2::SupportedType::INT + || vType == cpp2::SupportedType::VID + || vType == cpp2::SupportedType::FLOAT + || vType == cpp2::SupportedType::DOUBLE; + } + case cpp2::StatType::COUNT: { + break; + } + } + return true; +} + +template +void QueryBaseProcessor::collectProps( + SchemaProviderIf* rowSchema, + folly::StringPiece& key, + folly::StringPiece& val, + std::vector& props, + Collector* collector) { + UNUSED(key); + RowReader reader(rowSchema, val); + for (auto& prop : props) { + const auto& name = prop.prop_.get_name(); + VLOG(3) << "collect " << name; + switch(prop.type_.type) { + case cpp2::SupportedType::INT: { + int64_t v; + auto ret = reader.getInt(name, v); + collector->collectInt64(ret, v, prop); + break; + } + case cpp2::SupportedType::VID: { + int64_t v; + auto ret = reader.getVid(name, v); + collector->collectInt64(ret, v, prop); + break; + } + case cpp2::SupportedType::FLOAT: { + float v; + auto ret = reader.getFloat(name, v); + collector->collectFloat(ret, v, prop); + break; + } + case cpp2::SupportedType::DOUBLE: { + double v; + auto ret = reader.getDouble(name, v); + collector->collectDouble(ret, v, prop); + break; + } + case cpp2::SupportedType::STRING: { + folly::StringPiece v; + auto ret = reader.getString(name, v); + collector->collectString(ret, v, prop); + break; + } + default: { + VLOG(1) << "Unsupport stats!"; + break; + } + } // switch + } // for +} + +template +cpp2::ErrorCode QueryBaseProcessor::checkAndBuildContexts( + const REQ& req, + std::vector& tagContexts, + EdgeContext& edgeContext) { + if (req.__isset.edge_type) { + edgeContext.edgeType_ = req.edge_type; + edgeContext.schema_ = schemaMan_->edgeSchema(spaceId_, req.edge_type); + if (edgeContext.schema_ == nullptr) { + return cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + } + int32_t index = 0; + std::unordered_map tagIndex; + for (auto& col : req.get_return_columns()) { + PropContext prop; + switch (col.owner) { + case cpp2::PropOwner::SOURCE: + case cpp2::PropOwner::DEST: { + auto tagId = col.tag_id; + auto* schema = schemaMan_->tagSchema(spaceId_, tagId); + if (schema == nullptr) { + return cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + } + const auto* ftype = schema->getFieldType(col.name, 0); + prop.type_ = *ftype; + prop.retIndex_ = index; + if (col.__isset.stat && !validOperation(ftype->type, col.stat)) { + return cpp2::ErrorCode::E_IMPROPER_DATA_TYPE; + } + VLOG(3) << "tagId " << tagId << ", prop " << col.name; + prop.prop_ = std::move(col); + auto it = tagIndex.find(tagId); + if (it == tagIndex.end()) { + TagContext tc; + tc.schema_ = schema; + tc.tagId_ = tagId; + tc.props_.emplace_back(std::move(prop)); + tagContexts.emplace_back(std::move(tc)); + tagIndex.emplace(tagId, tagContexts.size() - 1); + } else { + tagContexts[it->second].props_.emplace_back(std::move(prop)); + } + break; + } + case cpp2::PropOwner::EDGE: { + if (edgeContext.schema_ != nullptr) { + const auto* ftype = edgeContext.schema_->getFieldType(col.name, 0); + prop.type_ = *ftype; + if (col.__isset.stat && !validOperation(ftype->type, col.stat)) { + return cpp2::ErrorCode::E_IMPROPER_DATA_TYPE; + } + prop.retIndex_ = index; + prop.prop_ = std::move(col); + edgeContext.props_.emplace_back(std::move(prop)); + } + break; + } + } + index++; + } + return cpp2::ErrorCode::SUCCEEDED; +} + +template +void QueryBaseProcessor::process(const cpp2::GetNeighborsRequest& req) { + spaceId_ = req.get_space_id(); + int32_t returnColumnsNum = req.get_return_columns().size(); + std::vector tagContexts; + EdgeContext edgeContext; + auto retCode = checkAndBuildContexts(req, tagContexts, edgeContext); + if (retCode != cpp2::ErrorCode::SUCCEEDED) { + this->pushResultCode(retCode, -1); + this->resp_.latency_in_ms = this->duration_.elapsedInMSec(); + this->onFinished(); + return; + } + +// const auto& filter = req.get_filter(); + std::for_each(req.get_ids().begin(), req.get_ids().end(), [&](auto& partV) { + auto partId = partV.first; + kvstore::ResultCode ret; + for (auto& vId : partV.second) { + VLOG(3) << "Process part " << partId << ", vertex " << vId; + ret = processVertex(partId, vId, tagContexts, edgeContext); + if (ret != kvstore::ResultCode::SUCCESSED) { + break; + } + } + this->pushResultCode(this->to(ret), partId); + }); + + onProcessed(tagContexts, edgeContext, returnColumnsNum); + this->resp_.latency_in_ms = this->duration_.elapsedInMSec(); + this->onFinished(); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/QueryBoundProcessor.cpp b/src/storage/QueryBoundProcessor.cpp new file mode 100644 index 00000000000..30187dcf527 --- /dev/null +++ b/src/storage/QueryBoundProcessor.cpp @@ -0,0 +1,123 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "storage/QueryBoundProcessor.h" +#include +#include "time/Duration.h" +#include "storage/KeyUtils.h" +#include "dataman/RowReader.h" +#include "dataman/RowWriter.h" + +namespace nebula { +namespace storage { + +kvstore::ResultCode QueryBoundProcessor::collectVertexProps( + PartitionID partId, + VertexID vId, + TagID tagId, + SchemaProviderIf* tagSchema, + std::vector& props, + RowWriter& writer) { + auto prefix = KeyUtils::prefix(partId, vId, tagId); + std::unique_ptr iter; + auto ret = kvstore_->prefix(spaceId_, partId, prefix, iter); + if (ret != kvstore::ResultCode::SUCCESSED) { + VLOG(3) << "Error! ret = " << static_cast(ret) << ", spaceId " << spaceId_; + return ret; + } + // Only get the latest version. + if (iter && iter->valid()) { + auto key = iter->key(); + auto val = iter->val(); + PropsCollector collector(&writer); + this->collectProps(tagSchema, key, val, props, &collector); + } else { + VLOG(3) << "Missed partId " << partId << ", vId " << vId << ", tagId " << tagId; + } + return ret; +} + +kvstore::ResultCode QueryBoundProcessor::collectEdgeProps( + PartitionID partId, + VertexID vId, + EdgeType edgeType, + SchemaProviderIf* schema, + std::vector& props, + RowSetWriter& rsWriter) { + auto prefix = KeyUtils::prefix(partId, vId, edgeType); + std::unique_ptr iter; + auto ret = kvstore_->prefix(spaceId_, partId, prefix, iter); + if (ret != kvstore::ResultCode::SUCCESSED || !iter) { + return ret; + } + while(iter->valid()) { + RowWriter writer; + auto key = iter->key(); + auto val = iter->val(); + PropsCollector collector(&writer); + this->collectProps(schema, key, val, props, &collector); + iter->next(); + rsWriter.addRow(writer); + } + return ret; +} + +kvstore::ResultCode QueryBoundProcessor::processVertex(PartitionID partId, VertexID vId, + std::vector& tagContexts, + EdgeContext& edgeContext) { + cpp2::VertexResponse vResp; + vResp.vertex_id = vId; + if (!tagContexts.empty()) { + RowWriter writer; + for (auto& tc : tagContexts) { + VLOG(3) << "partId " << partId << ", vId " << vId + << ", tagId " << tc.tagId_ << ", prop size " << tc.props_.size(); + auto ret = collectVertexProps(partId, vId, tc.tagId_, tc.schema_, tc.props_, writer); + if (ret != kvstore::ResultCode::SUCCESSED) { + return ret; + } + } + vResp.vertex_data = writer.encode(); + } + if (edgeContext.schema_ != nullptr) { + RowSetWriter rsWriter; + auto ret = collectEdgeProps(partId, vId, edgeContext.edgeType_, + edgeContext.schema_, edgeContext.props_, rsWriter); + if (ret != kvstore::ResultCode::SUCCESSED) { + return ret; + } + vResp.edge_data = std::move(rsWriter.data()); + } + resp_.vertices.emplace_back(std::move(vResp)); + return kvstore::ResultCode::SUCCESSED; +} + +void QueryBoundProcessor::onProcessed(std::vector& tagContexts, + EdgeContext& edgeContext, int32_t retNum) { + if (!tagContexts.empty()) { + cpp2::Schema respTag; + respTag.columns.reserve(retNum - edgeContext.props_.size()); + for (auto& tc : tagContexts) { + for (auto& prop : tc.props_) { + respTag.columns.emplace_back(columnDef(std::move(prop.prop_.name), + prop.type_.type)); + } + } + resp_.vertex_schema = std::move(respTag); + } + if (edgeContext.schema_ != nullptr) { + cpp2::Schema respEdge; + respEdge.columns.reserve(edgeContext.props_.size()); + for (auto& prop : edgeContext.props_) { + respEdge.columns.emplace_back(columnDef(std::move(prop.prop_.name), + prop.type_.type)); + } + resp_.edge_schema = std::move(respEdge); + } +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/QueryBoundProcessor.h b/src/storage/QueryBoundProcessor.h new file mode 100644 index 00000000000..b14baf5e7e1 --- /dev/null +++ b/src/storage/QueryBoundProcessor.h @@ -0,0 +1,56 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef STORAGE_QUERYBOUNDPROCESSOR_H_ +#define STORAGE_QUERYBOUNDPROCESSOR_H_ + +#include "base/Base.h" +#include "storage/QueryBaseProcessor.h" + +namespace nebula { +namespace storage { + + +class QueryBoundProcessor + : public QueryBaseProcessor { +public: + static QueryBoundProcessor* instance(kvstore::KVStore* kvstore, + meta::SchemaManager* schemaMan) { + return new QueryBoundProcessor(kvstore, schemaMan); + } + +protected: + QueryBoundProcessor(kvstore::KVStore* kvstore, meta::SchemaManager* schemaMan) + : QueryBaseProcessor(kvstore, schemaMan) {} + + kvstore::ResultCode processVertex(PartitionID partID, VertexID vId, + std::vector& tagContexts, + EdgeContext& edgeContext) override; + + void onProcessed(std::vector& tagContexts, + EdgeContext& edgeContext, + int32_t retNum) override; + + kvstore::ResultCode collectVertexProps( + PartitionID partId, + VertexID vId, + TagID tagId, + SchemaProviderIf* tagSchema, + std::vector& props, + RowWriter& writer); + + kvstore::ResultCode collectEdgeProps( + PartitionID partId, + VertexID vId, + EdgeType edgeType, + SchemaProviderIf* schema, + std::vector& props, + RowSetWriter& writer); +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_QUERYBOUNDPROCESSOR_H_ diff --git a/src/storage/QueryEdgePropsProcessor.cpp b/src/storage/QueryEdgePropsProcessor.cpp new file mode 100644 index 00000000000..9e059df8876 --- /dev/null +++ b/src/storage/QueryEdgePropsProcessor.cpp @@ -0,0 +1,86 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "storage/QueryEdgePropsProcessor.h" +#include +#include "time/Duration.h" +#include "storage/KeyUtils.h" +#include "dataman/RowReader.h" +#include "dataman/RowWriter.h" + +namespace nebula { +namespace storage { + +kvstore::ResultCode QueryEdgePropsProcessor::collectEdgesProps( + PartitionID partId, + const cpp2::EdgeKey& edgeKey, + SchemaProviderIf* edgeSchema, + std::vector& props, + RowSetWriter& rsWriter) { + auto prefix = KeyUtils::prefix(partId, edgeKey.src, edgeKey.edge_type, edgeKey.dst, edgeKey.ranking); + std::unique_ptr iter; + auto ret = kvstore_->prefix(spaceId_, partId, prefix, iter); + // Only use the latest version. + if (iter && iter->valid()) { + RowWriter writer; + auto key = iter->key(); + auto val = iter->val(); + PropsCollector collector(&writer); + this->collectProps(edgeSchema, key, val, props, &collector); + iter->next(); + rsWriter.addRow(writer); + } + return ret; +} + +void QueryEdgePropsProcessor::process(const cpp2::EdgePropRequest& req) { + spaceId_ = req.get_space_id(); + int32_t returnColumnsNum = req.get_return_columns().size(); + EdgeContext edgeContext; + std::vector tagContexts; + auto retCode = this->checkAndBuildContexts(req, tagContexts, edgeContext); + if (retCode != cpp2::ErrorCode::SUCCEEDED) { + this->pushResultCode(retCode, -1); + this->resp_.latency_in_ms = duration_.elapsedInMSec(); + this->onFinished(); + return; + } + + RowSetWriter rsWriter; + std::for_each(req.get_edges().begin(), req.get_edges().end(), [&](auto& partE) { + auto partId = partE.first; + kvstore::ResultCode ret; + for (auto& edgeKey : partE.second) { + ret = this->collectEdgesProps(partId, edgeKey, edgeContext.schema_, + edgeContext.props_, rsWriter); + if (ret != kvstore::ResultCode::SUCCESSED) { + break; + } + }; + // TODO handle failures + this->pushResultCode(this->to(ret), partId); + }); + resp_.data = std::move(rsWriter.data()); + + std::vector props; + props.reserve(returnColumnsNum); + for (auto& prop : edgeContext.props_) { + props.emplace_back(std::move(prop)); + } + std::sort(props.begin(), props.end(), [](auto& l, auto& r){ + return l.retIndex_ < r.retIndex_; + }); + for (auto& prop : props) { + resp_.schema.columns.emplace_back( + columnDef(std::move(prop.prop_.name), + prop.type_.type)); + } + resp_.latency_in_ms = duration_.elapsedInMSec(); + this->onFinished(); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/QueryEdgePropsProcessor.h b/src/storage/QueryEdgePropsProcessor.h new file mode 100644 index 00000000000..0d52873a07d --- /dev/null +++ b/src/storage/QueryEdgePropsProcessor.h @@ -0,0 +1,59 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef STORAGE_QUERYEDGEPROPSROCESSOR_H_ +#define STORAGE_QUERYEDGEPROPSROCESSOR_H_ + +#include "storage/QueryBaseProcessor.h" + +namespace nebula { +namespace storage { + +class QueryEdgePropsProcessor + : public QueryBaseProcessor { +public: + static QueryEdgePropsProcessor* instance(kvstore::KVStore* kvstore, + meta::SchemaManager* schemaMan) { + return new QueryEdgePropsProcessor(kvstore, schemaMan); + } + + // It is one new method for QueryBaseProcessor.process. + void process(const cpp2::EdgePropRequest& req); + +private: + QueryEdgePropsProcessor(kvstore::KVStore* kvstore, meta::SchemaManager* schemaMan) + : QueryBaseProcessor(kvstore, schemaMan) {} + + kvstore::ResultCode collectEdgesProps(PartitionID partId, + const cpp2::EdgeKey& edgeKey, + SchemaProviderIf* edgeSchema, + std::vector& props, + RowSetWriter& rsWriter); + + kvstore::ResultCode processVertex(PartitionID partID, VertexID vId, + std::vector& tagContexts, + EdgeContext& edgeContext) { + UNUSED(partID); + UNUSED(vId); + UNUSED(tagContexts); + UNUSED(edgeContext); + LOG(FATAL) << "Unimplement!"; + return kvstore::ResultCode::SUCCESSED; + } + + void onProcessed(std::vector& tagContexts, + EdgeContext& edgeContext, int32_t retNum) { + UNUSED(tagContexts); + UNUSED(edgeContext); + UNUSED(retNum); + LOG(FATAL) << "Unimplement!"; + } + +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_QUERYEDGEPROPSROCESSOR_H_ diff --git a/src/storage/QueryProcessor.h b/src/storage/QueryProcessor.h deleted file mode 100644 index 8269180c644..00000000000 --- a/src/storage/QueryProcessor.h +++ /dev/null @@ -1,86 +0,0 @@ -/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved - * - * This source code is licensed under Apache 2.0 License - * (found in the LICENSE.Apache file in the root directory) - */ - -#ifndef STORAGE_QUERYPROCESSOR_H_ -#define STORAGE_QUERYPROCESSOR_H_ - -#include "base/Base.h" -#include -#include -#include -#include "kvstore/include/KVStore.h" -#include "interface/gen-cpp2/storage_types.h" -#include "storage/BaseProcessor.h" - -namespace nebula { -namespace storage { - -class QueryBoundProcessor : public BaseProcessor { -public: - static QueryBoundProcessor* instance(kvstore::KVStore* kvstore) { - return new QueryBoundProcessor(kvstore); - } - - void process(const cpp2::GetNeighborsRequest& req) override { - VLOG(1) << req.get_space_id(); - } - -private: - QueryBoundProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} -}; - -class QueryStatsProcessor : public BaseProcessor { -public: - static QueryStatsProcessor* instance(kvstore::KVStore* kvstore) { - return new QueryStatsProcessor(kvstore); - } - - void process(const cpp2::NeighborsStatsRequest& req) override { - VLOG(1) << req.get_space_id(); - } - -private: - QueryStatsProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} -}; - -class QueryVertexPropsProcessor - : public BaseProcessor { -public: - static QueryVertexPropsProcessor* instance(kvstore::KVStore* kvstore) { - return new QueryVertexPropsProcessor(kvstore); - } - - void process(const cpp2::VertexPropRequest& req) override { - VLOG(1) << req.get_space_id(); - // TODO - } - -private: - QueryVertexPropsProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} -}; - -class QueryEdgePropsProcessor : public BaseProcessor { -public: - static QueryEdgePropsProcessor* instance(kvstore::KVStore* kvstore) { - return new QueryEdgePropsProcessor(kvstore); - } - - void process(const cpp2::EdgePropRequest& req) override { - VLOG(1) << req.get_space_id(); - // TODO - } - -private: - QueryEdgePropsProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} -}; - -} // namespace storage -} // namespace nebula -#endif // STORAGE_QUERYPROCESSOR_H_ diff --git a/src/storage/QueryStatsProcessor.cpp b/src/storage/QueryStatsProcessor.cpp new file mode 100644 index 00000000000..767960501d7 --- /dev/null +++ b/src/storage/QueryStatsProcessor.cpp @@ -0,0 +1,140 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "storage/QueryStatsProcessor.h" +#include +#include "time/Duration.h" +#include "storage/KeyUtils.h" +#include "dataman/RowReader.h" +#include "dataman/RowWriter.h" + + +namespace nebula { +namespace storage { + +kvstore::ResultCode +QueryStatsProcessor::collectVertexStats(PartitionID partId, VertexID vId, TagID tagId, + SchemaProviderIf* tagSchema, + std::vector& props) { + auto prefix = KeyUtils::prefix(partId, vId, tagId); + std::unique_ptr iter; + auto ret = kvstore_->prefix(spaceId_, partId, prefix, iter); + if (ret != kvstore::ResultCode::SUCCESSED) { + return ret; + } + // Only get the latest version. + if (iter && iter->valid()) { + auto key = iter->val(); + auto val = iter->val(); + collectProps(tagSchema, key, val, props, &collector_); + } + return ret; +} + +kvstore::ResultCode +QueryStatsProcessor::collectEdgesStats(PartitionID partId, VertexID vId, EdgeType edgeType, + SchemaProviderIf* edgeSchema, + std::vector& props) { + auto prefix = KeyUtils::prefix(partId, vId, edgeType); + std::unique_ptr iter; + auto ret = kvstore_->prefix(spaceId_, partId, prefix, iter); + if (ret != kvstore::ResultCode::SUCCESSED || !iter) { + return ret; + } + while(iter->valid()) { + auto key = iter->key(); + auto val = iter->val(); + collectProps(edgeSchema, key, val, props, &collector_); + iter->next(); + } + return ret; +} + +void QueryStatsProcessor::calcResult(std::vector&& props) { + RowWriter writer; + for (auto& prop : props) { + switch(prop.prop_.stat) { + case cpp2::StatType::SUM: { + switch (prop.sum_.which()) { + case 0: + writer << boost::get(prop.sum_); + resp_.schema.columns.emplace_back( + columnDef(std::move(prop.prop_.name), + cpp2::SupportedType::INT)); + break; + case 1: + writer << boost::get(prop.sum_); + resp_.schema.columns.emplace_back( + columnDef(std::move(prop.prop_.name), + cpp2::SupportedType::DOUBLE)); + break; + } + break; + } + case cpp2::StatType::COUNT: { + writer << prop.count_; + resp_.schema.columns.emplace_back( + columnDef(std::move(prop.prop_.name), + cpp2::SupportedType::INT)); + break; + } + case cpp2::StatType::AVG: { + switch (prop.sum_.which()) { + case 0: + writer << (double)boost::get(prop.sum_) / prop.count_; + break; + case 1: + writer << boost::get(prop.sum_) / prop.count_; + break; + } + resp_.schema.columns.emplace_back( + columnDef(std::move(prop.prop_.name), + cpp2::SupportedType::DOUBLE)); + break; + } + } + } + resp_.data = std::move(writer.encode()); +} + +kvstore::ResultCode QueryStatsProcessor::processVertex(PartitionID partId, VertexID vId, + std::vector& tagContexts, + EdgeContext& edgeContext) { + for (auto& tc : tagContexts) { + auto ret = collectVertexStats(partId, vId, tc.tagId_, tc.schema_, tc.props_); + if (ret != kvstore::ResultCode::SUCCESSED) { + return ret; + } + } + if (edgeContext.schema_ != nullptr) { + auto ret = collectEdgesStats(partId, vId, edgeContext.edgeType_, + edgeContext.schema_, edgeContext.props_); + if (ret != kvstore::ResultCode::SUCCESSED) { + return ret; + } + } + return kvstore::ResultCode::SUCCESSED; +} + +void QueryStatsProcessor::onProcessed(std::vector& tagContexts, + EdgeContext& edgeContext, int32_t retNum) { + std::vector props; + props.reserve(retNum); + for (auto& tc : tagContexts) { + for (auto& prop : tc.props_) { + props.emplace_back(std::move(prop)); + } + } + for (auto& prop : edgeContext.props_) { + props.emplace_back(std::move(prop)); + } + std::sort(props.begin(), props.end(), [](auto& l, auto& r){ + return l.retIndex_ < r.retIndex_; + }); + calcResult(std::move(props)); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/QueryStatsProcessor.h b/src/storage/QueryStatsProcessor.h new file mode 100644 index 00000000000..d2e0b9079af --- /dev/null +++ b/src/storage/QueryStatsProcessor.h @@ -0,0 +1,55 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef STORAGE_QUERYSTATSPROCESSOR_H_ +#define STORAGE_QUERYSTATSPROCESSOR_H_ + +#include "base/Base.h" +#include "storage/QueryBaseProcessor.h" + +namespace nebula { +namespace storage { + +class QueryStatsProcessor + : public QueryBaseProcessor { +public: + static QueryStatsProcessor* instance(kvstore::KVStore* kvstore, + meta::SchemaManager* schemaMan) { + return new QueryStatsProcessor(kvstore, schemaMan); + } + +private: + QueryStatsProcessor(kvstore::KVStore* kvstore, meta::SchemaManager* schemaMan) + : QueryBaseProcessor(kvstore, schemaMan) {} + + kvstore::ResultCode processVertex(PartitionID partID, VertexID vId, + std::vector& tagContexts, + EdgeContext& edgeContext) override; + + void onProcessed(std::vector& tagContexts, + EdgeContext& edgeContext, + int32_t retNum) override; + + kvstore::ResultCode collectVertexStats(PartitionID partId, + VertexID vId, TagID tagId, + SchemaProviderIf* tagSchema, + std::vector& props); + + + kvstore::ResultCode collectEdgesStats(PartitionID partId, + VertexID vId, EdgeType edgeType, + SchemaProviderIf* edgeSchema, + std::vector& props); + + void calcResult(std::vector&& props); + +private: + StatsCollector collector_; +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_QUERYSTATSPROCESSOR_H_ diff --git a/src/storage/QueryVertexPropsProcessor.cpp b/src/storage/QueryVertexPropsProcessor.cpp new file mode 100644 index 00000000000..dd3257f5681 --- /dev/null +++ b/src/storage/QueryVertexPropsProcessor.cpp @@ -0,0 +1,31 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "storage/QueryVertexPropsProcessor.h" +#include +#include "time/Duration.h" +#include "storage/KeyUtils.h" +#include "dataman/RowReader.h" +#include "dataman/RowWriter.h" + +namespace nebula { +namespace storage { + +void QueryVertexPropsProcessor::process(const cpp2::VertexPropRequest& vertexReq) { + cpp2::GetNeighborsRequest req; + req.set_space_id(vertexReq.get_space_id()); + req.set_ids(std::move(vertexReq.get_ids())); + decltype(req.return_columns) tmpColumns; + tmpColumns.reserve(vertexReq.get_return_columns().size()); + for (auto& col : vertexReq.get_return_columns()) { + tmpColumns.emplace_back(std::move(col)); + } + req.set_return_columns(std::move(tmpColumns)); + QueryBoundProcessor::process(req); +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/QueryVertexPropsProcessor.h b/src/storage/QueryVertexPropsProcessor.h new file mode 100644 index 00000000000..a05cd890fab --- /dev/null +++ b/src/storage/QueryVertexPropsProcessor.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef STORAGE_QUERYVERTEXPROPSPROCESSOR_H_ +#define STORAGE_QUERYVERTEXPROPSPROCESSOR_H_ + +#include "base/Base.h" +#include "storage/QueryBoundProcessor.h" + +namespace nebula { +namespace storage { + +class QueryVertexPropsProcessor : public QueryBoundProcessor { +public: + static QueryVertexPropsProcessor* instance( + kvstore::KVStore* kvstore, + meta::SchemaManager* schemaMan) { + return new QueryVertexPropsProcessor(kvstore, schemaMan); + } + + void process(const cpp2::VertexPropRequest& req); + +private: + QueryVertexPropsProcessor(kvstore::KVStore* kvstore, meta::SchemaManager* schemaMan) + : QueryBoundProcessor(kvstore, schemaMan) {} + +}; + +} // namespace storage +} // namespace nebula +#endif // STORAGE_QUERYVERTEXPROPSPROCESSOR_H_ diff --git a/src/storage/StorageServiceHandler.cpp b/src/storage/StorageServiceHandler.cpp index 4fda5baf436..64f4e75de5a 100644 --- a/src/storage/StorageServiceHandler.cpp +++ b/src/storage/StorageServiceHandler.cpp @@ -8,7 +8,10 @@ #include "base/Base.h" #include "storage/AddVerticesProcessor.h" #include "storage/AddEdgesProcessor.h" -#include "storage/QueryProcessor.h" +#include "storage/QueryBoundProcessor.h" +#include "storage/QueryVertexPropsProcessor.h" +#include "storage/QueryEdgePropsProcessor.h" +#include "storage/QueryStatsProcessor.h" #define RETURN_FUTURE(processor) \ auto f = processor->getFuture(); \ @@ -20,37 +23,37 @@ namespace storage { folly::Future StorageServiceHandler::future_getOutBound(const cpp2::GetNeighborsRequest& req) { - auto* processor = QueryBoundProcessor::instance(kvstore_); + auto* processor = QueryBoundProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } folly::Future StorageServiceHandler::future_getInBound(const cpp2::GetNeighborsRequest& req) { - auto* processor = QueryBoundProcessor::instance(kvstore_); + auto* processor = QueryBoundProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } -folly::Future -StorageServiceHandler::future_outBoundStats(const cpp2::NeighborsStatsRequest& req) { - auto* processor = QueryStatsProcessor::instance(kvstore_); +folly::Future +StorageServiceHandler::future_outBoundStats(const cpp2::GetNeighborsRequest& req) { + auto* processor = QueryStatsProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } -folly::Future -StorageServiceHandler::future_inBoundStats(const cpp2::NeighborsStatsRequest& req) { - auto* processor = QueryStatsProcessor::instance(kvstore_); +folly::Future +StorageServiceHandler::future_inBoundStats(const cpp2::GetNeighborsRequest& req) { + auto* processor = QueryStatsProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } folly::Future StorageServiceHandler::future_getProps(const cpp2::VertexPropRequest& req) { - auto* processor = QueryVertexPropsProcessor::instance(kvstore_); + auto* processor = QueryVertexPropsProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } -folly::Future +folly::Future StorageServiceHandler::future_getEdgeProps(const cpp2::EdgePropRequest& req) { - auto* processor = QueryEdgePropsProcessor::instance(kvstore_); + auto* processor = QueryEdgePropsProcessor::instance(kvstore_, schemaMan_); RETURN_FUTURE(processor); } diff --git a/src/storage/StorageServiceHandler.h b/src/storage/StorageServiceHandler.h index 8ac274f0024..a4c2ec1ad9b 100644 --- a/src/storage/StorageServiceHandler.h +++ b/src/storage/StorageServiceHandler.h @@ -11,6 +11,7 @@ #include "base/Base.h" #include "interface/gen-cpp2/StorageService.h" #include "kvstore/include/KVStore.h" +#include "meta/SchemaManager.h" namespace nebula { namespace storage { @@ -18,22 +19,27 @@ namespace storage { class StorageServiceHandler final : public cpp2::StorageServiceSvIf { FRIEND_TEST(StorageServiceHandlerTest, FutureAddVerticesTest); public: + + StorageServiceHandler(kvstore::KVStore* kvstore, meta::SchemaManager* schemaMan) + : kvstore_(kvstore) + , schemaMan_(schemaMan) {} + folly::Future future_getOutBound(const cpp2::GetNeighborsRequest& req) override; folly::Future future_getInBound(const cpp2::GetNeighborsRequest& req) override; - folly::Future - future_outBoundStats(const cpp2::NeighborsStatsRequest& req) override; + folly::Future + future_outBoundStats(const cpp2::GetNeighborsRequest& req) override; - folly::Future - future_inBoundStats(const cpp2::NeighborsStatsRequest& req) override; + folly::Future + future_inBoundStats(const cpp2::GetNeighborsRequest& req) override; folly::Future future_getProps(const cpp2::VertexPropRequest& req) override; - folly::Future + folly::Future future_getEdgeProps(const cpp2::EdgePropRequest& req) override; folly::Future @@ -44,6 +50,7 @@ FRIEND_TEST(StorageServiceHandlerTest, FutureAddVerticesTest); private: kvstore::KVStore* kvstore_ = nullptr; + meta::SchemaManager* schemaMan_ = nullptr; }; } // namespace storage diff --git a/src/storage/test/AddEdgesTest.cpp b/src/storage/test/AddEdgesTest.cpp new file mode 100644 index 00000000000..e7be3c9ef3c --- /dev/null +++ b/src/storage/test/AddEdgesTest.cpp @@ -0,0 +1,78 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "storage/test/TestUtils.h" +#include "storage/AddEdgesProcessor.h" +#include "storage/KeyUtils.h" + + +namespace nebula { +namespace storage { + +TEST(AddEdgesTest, SimpleTest) { + fs::TempDir rootPath("/tmp/AddEdgesTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto* processor = AddEdgesProcessor::instance(kv.get()); + LOG(INFO) << "Build AddEdgesRequest..."; + cpp2::AddEdgesRequest req ; + req.space_id = 0; + req.overwritable = true; + // partId => List + // Edge => {EdgeKey, props} + for (auto partId = 0; partId < 3; partId++) { + std::vector edges; + for (auto srcId = partId * 10; srcId < 10 * (partId + 1); srcId++) { + edges.emplace_back(apache::thrift::FragileConstructor::FRAGILE, + cpp2::EdgeKey(apache::thrift::FragileConstructor::FRAGILE, + srcId, srcId*100 + 1, srcId*100 + 2, srcId*100 + 3), + folly::stringPrintf("%d_%d", partId, srcId)); + } + req.edges.emplace(partId, std::move(edges)); + } + + LOG(INFO) << "Test AddEdgesProcessor..."; + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + EXPECT_EQ(3, resp.codes.size()); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.codes[i].code); + } + + LOG(INFO) << "Check data in kv store..."; + for (auto partId = 0; partId < 3; partId++) { + for (auto srcId = 10 * partId; srcId < 10 * (partId + 1); srcId++) { + auto prefix = KeyUtils::prefix(partId, srcId, srcId*100 + 1); + std::unique_ptr iter; + EXPECT_EQ(kvstore::ResultCode::SUCCESSED, kv->prefix(0, partId, prefix, iter)); + int num = 0; + while (iter->valid()) { + EXPECT_EQ(folly::stringPrintf("%d_%d", partId, srcId), iter->val()); + num++; + iter->next(); + } + EXPECT_EQ(1, num); + } + } + +} + +} // namespace storage +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/storage/test/AddVerticesTest.cpp b/src/storage/test/AddVerticesTest.cpp index 5861f4e212a..75b1a5a8ded 100644 --- a/src/storage/test/AddVerticesTest.cpp +++ b/src/storage/test/AddVerticesTest.cpp @@ -8,18 +8,17 @@ #include #include #include "fs/TempDir.h" -#include "storage/test/StorageTestBase.h" +#include "storage/test/TestUtils.h" #include "storage/AddVerticesProcessor.h" #include "storage/KeyUtils.h" - namespace nebula { namespace storage { TEST(AddVerticesTest, SimpleTest) { fs::TempDir rootPath("/tmp/AddVerticesTest.XXXXXX"); - auto* kv = TestUtils::initKV(rootPath.path()); - auto* processor = AddVerticesProcessor::instance(kv); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + auto* processor = AddVerticesProcessor::instance(kv.get()); LOG(INFO) << "Build AddVerticesRequest..."; cpp2::AddVerticesRequest req ; req.space_id = 0; diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 4d20c8a98e9..d31807e9888 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -4,6 +4,8 @@ add_executable( $ $ $ + $ + $ $ $ $ @@ -36,10 +38,11 @@ add_test(NAME add_vertices_test COMMAND add_vertices_test) add_executable( storage_service_handler_test StorageServiceHandlerTest.cpp - StorageTestBase.cpp $ $ $ + $ + $ $ $ $ @@ -68,3 +71,188 @@ target_link_libraries( add_test(NAME storage_service_handler_test COMMAND storage_service_handler_test) +add_executable( + add_edges_test + AddEdgesTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + add_edges_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + folly + boost_system + boost_context + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + gtest + glog + gflags + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + jemalloc + -pthread +) + +add_test(NAME add_edges_test COMMAND add_edges_test) + +add_executable( + query_bound_test + QueryBoundTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + query_bound_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + folly + boost_system + boost_context + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + gtest + glog + gflags + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + jemalloc + -pthread +) + +add_test(NAME query_bound_test COMMAND query_bound_test) + +add_executable( + vertex_props_test + QueryVertexPropsTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + vertex_props_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + folly + boost_system + boost_context + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + gtest + glog + gflags + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + jemalloc + -pthread +) + +add_test(NAME vertex_props_test COMMAND vertex_props_test) + +add_executable( + edge_props_test + QueryEdgePropsTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + edge_props_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + folly + boost_system + boost_context + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + gtest + glog + gflags + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + jemalloc + -pthread +) + +add_test(NAME edge_props_test COMMAND edge_props_test) + +add_executable( + query_stats_test + QueryStatsTest.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + query_stats_test + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + folly + boost_system + boost_context + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + gtest + glog + gflags + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + jemalloc + -pthread +) + +add_test(NAME query_stats_test COMMAND query_stats_test) + diff --git a/src/storage/test/QueryBoundTest.cpp b/src/storage/test/QueryBoundTest.cpp new file mode 100644 index 00000000000..1c0714ccc3d --- /dev/null +++ b/src/storage/test/QueryBoundTest.cpp @@ -0,0 +1,176 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "storage/test/TestUtils.h" +#include "storage/QueryBoundProcessor.h" +#include "storage/KeyUtils.h" +#include "dataman/RowSetReader.h" +#include "dataman/RowReader.h" + +namespace nebula { +namespace storage { + +TEST(QueryBoundTest, OutBoundSimpleTest) { + fs::TempDir rootPath("/tmp/QueryBoundTest.XXXXXX"); + LOG(INFO) << "Prepare meta..."; + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + std::unique_ptr schemaMan(new meta::MemorySchemaManager()); + auto& edgeSchema = schemaMan->edgeSchema(); + edgeSchema[0][101] = TestUtils::genEdgeSchemaProvider(10, 10); + auto& tagSchema = schemaMan->tagSchema(); + for (auto tagId = 3001; tagId < 3010; tagId++) { + tagSchema[0][tagId] = TestUtils::genTagSchemaProvider(tagId, 3, 3); + } + CHECK_NOTNULL(edgeSchema[0][101]); + + LOG(INFO) << "Prepare data..."; + for (auto partId = 0; partId < 3; partId++) { + std::vector data; + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + for (auto tagId = 3001; tagId < 3010; tagId++) { + auto key = KeyUtils::vertexKey(partId, vertexId, tagId, 0); + RowWriter writer; + for (uint64_t numInt = 0; numInt < 3; numInt++) { + writer << numInt; + } + for (auto numString = 3; numString < 6; numString++) { + writer << folly::stringPrintf("tag_string_col_%d", numString); + } + auto val = writer.encode(); + data.emplace_back(std::move(key), std::move(val)); + } + // Generate 7 edges for each edgeType. + for (auto dstId = 10001; dstId <= 10007; dstId++) { + VLOG(3) << "Write part " << partId << ", vertex " << vertexId << ", dst " << dstId; + auto key = KeyUtils::edgeKey(partId, vertexId, 101, dstId, dstId - 10001, 0); + RowWriter writer(nullptr); + for (uint64_t numInt = 0; numInt < 10; numInt++) { + writer << numInt; + } + for (auto numString = 10; numString < 20; numString++) { + writer << folly::stringPrintf("string_col_%d", numString); + } + auto val = writer.encode(); + data.emplace_back(std::move(key), std::move(val)); + } + } + kv->asyncMultiPut(0, partId, std::move(data), [&](kvstore::ResultCode code, HostAddr addr) { + EXPECT_EQ(code, kvstore::ResultCode::SUCCESSED); + UNUSED(addr); + }); + } + + LOG(INFO) << "Build QueryBoundRequest..."; + cpp2::GetNeighborsRequest req; + req.set_space_id(0); + decltype(req.ids) tmpIds; + for (auto partId = 0; partId < 3; partId++) { + for (auto vertexId = partId * 10, index = 0; + vertexId < (partId + 1) * 10; + vertexId++, index++) { + tmpIds[partId].push_back(vertexId); + } + } + req.set_ids(std::move(tmpIds)); + req.set_edge_type(101); + // Return tag props col_0, col_2, col_4 + decltype(req.return_columns) tmpColumns; + for (int i = 0; i < 3; i++) { + tmpColumns.emplace_back(TestUtils::propDef( + cpp2::PropOwner::SOURCE, + folly::stringPrintf("tag_%d_col_%d", 3001 + i*2, i*2), + 3001 + i*2)); + } + // Return edge props col_0, col_2, col_4 ... col_18 + for (int i = 0; i < 10; i++) { + tmpColumns.emplace_back(TestUtils::propDef(cpp2::PropOwner::EDGE, + folly::stringPrintf("col_%d", i*2))); + } + req.set_return_columns(std::move(tmpColumns)); + + LOG(INFO) << "Test QueryOutBoundRequest..."; + auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + EXPECT_EQ(3, resp.codes.size()); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.codes[i].code); + } + + EXPECT_EQ(10, resp.edge_schema.columns.size()); + EXPECT_EQ(3, resp.vertex_schema.columns.size()); + auto provider = std::make_unique(resp.edge_schema); + auto tagProvider = std::make_unique(resp.vertex_schema); + EXPECT_EQ(30, resp.vertices.size()); + for (auto& vp : resp.vertices) { + LOG(INFO) << "Check vertex props..."; + RowReader tagReader(tagProvider.get(), vp.vertex_data); + EXPECT_EQ(3, tagReader.numFields()); + int64_t col1; + EXPECT_EQ(ResultType::SUCCEEDED, tagReader.getInt("tag_3001_col_0", col1)); + EXPECT_EQ(col1, 0); + + int64_t col2; + EXPECT_EQ(ResultType::SUCCEEDED, tagReader.getInt("tag_3003_col_2", col2)); + EXPECT_EQ(col2, 2); + + folly::StringPiece col3; + EXPECT_EQ(ResultType::SUCCEEDED, tagReader.getString("tag_3005_col_4", col3)); + EXPECT_EQ(folly::stringPrintf("tag_string_col_4"), col3); + + LOG(INFO) << "Check edge props..."; + RowSetReader rsReader(provider.get(), vp.edge_data); + auto it = rsReader.begin(); + int32_t rowNum = 0; + while (bool(it)) { + auto fieldIt = it->begin(); + int32_t i = 0; + std::stringstream ss; + while (bool(fieldIt)) { + if (i < 5) { + int64_t v; + EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getInt(v)); + CHECK_EQ(i*2, v); + ss << v << ","; + } else { + folly::StringPiece v; + EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getString(v)); + CHECK_EQ(folly::stringPrintf("string_col_%d", i*2), v); + ss << v << ","; + } + i++; + ++fieldIt; + } + VLOG(3) << ss.str(); + EXPECT_EQ(fieldIt, it->end()); + EXPECT_EQ(10, i); + ++it; + rowNum++; + } + EXPECT_EQ(it, rsReader.end()); + EXPECT_EQ(7, rowNum); + } +} + +} // namespace storage +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/storage/test/QueryEdgePropsTest.cpp b/src/storage/test/QueryEdgePropsTest.cpp new file mode 100644 index 00000000000..199c5010c6c --- /dev/null +++ b/src/storage/test/QueryEdgePropsTest.cpp @@ -0,0 +1,133 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "storage/test/TestUtils.h" +#include "storage/QueryEdgePropsProcessor.h" +#include "storage/KeyUtils.h" +#include "dataman/RowSetReader.h" +#include "dataman/RowReader.h" + +namespace nebula { +namespace storage { + +TEST(QueryEdgePropsTest, SimpleTest) { + fs::TempDir rootPath("/tmp/QueryEdgePropsTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + LOG(INFO) << "Prepare meta..."; + std::unique_ptr schemaMan(new meta::MemorySchemaManager()); + auto& edgeSchema = schemaMan->edgeSchema(); + edgeSchema[0][101] = TestUtils::genEdgeSchemaProvider(10, 10); + CHECK_NOTNULL(edgeSchema[0][101]); + + LOG(INFO) << "Prepare data..."; + for (auto partId = 0; partId < 3; partId++) { + std::vector data; + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + // Generate 7 edges for each edgeType. + for (auto dstId = 10001; dstId <= 10007; dstId++) { + VLOG(3) << "Write part " << partId << ", vertex " << vertexId << ", dst " << dstId; + auto key = KeyUtils::edgeKey(partId, vertexId, 101, dstId, dstId - 10001, 0); + RowWriter writer(nullptr); + for (int64_t numInt = 0; numInt < 10; numInt++) { + writer << numInt; + } + for (auto numString = 10; numString < 20; numString++) { + writer << folly::stringPrintf("string_col_%d", numString); + } + auto val = writer.encode(); + data.emplace_back(std::move(key), std::move(val)); + } + } + kv->asyncMultiPut(0, partId, std::move(data), [&](kvstore::ResultCode code, HostAddr addr) { + EXPECT_EQ(code, kvstore::ResultCode::SUCCESSED); + UNUSED(addr); + }); + } + + LOG(INFO) << "Build EdgePropRequest..."; + cpp2::EdgePropRequest req; + req.set_space_id(0); + decltype(req.edges) tmpEdges; + for (auto partId = 0; partId < 3; partId++) { + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + for (auto dstId = 10001; dstId <= 10007; dstId++) { + tmpEdges[partId].emplace_back(apache::thrift::FragileConstructor::FRAGILE, + vertexId, 101, dstId, dstId - 10001); + } + } + } + req.set_edges(std::move(tmpEdges)); + req.set_edge_type(101); + // Return edge props col_0, col_2, col_4 ... col_18 + decltype(req.return_columns) tmpColumns; + for (int i = 0; i < 10; i++) { + tmpColumns.emplace_back(TestUtils::propDef(cpp2::PropOwner::EDGE, + folly::stringPrintf("col_%d", i*2))); + } + req.set_return_columns(std::move(tmpColumns)); + LOG(INFO) << "Test QueryEdgePropsRequest..."; + auto* processor = QueryEdgePropsProcessor::instance(kv.get(), schemaMan.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + EXPECT_EQ(3, resp.codes.size()); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.codes[i].code); + } + + EXPECT_EQ(10, resp.schema.columns.size()); + auto provider = std::make_unique(resp.schema); + LOG(INFO) << "Check edge props..."; + RowSetReader rsReader(provider.get(), resp.data); + auto it = rsReader.begin(); + int32_t rowNum = 0; + while (bool(it)) { + auto fieldIt = it->begin(); + int32_t i = 0; + std::stringstream ss; + while (bool(fieldIt)) { + if (i < 5) { + int64_t v; + EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getInt(v)); + CHECK_EQ(i*2, v); + ss << v << ","; + } else { + folly::StringPiece v; + EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getString(v)); + CHECK_EQ(folly::stringPrintf("string_col_%d", i*2), v); + ss << v << ","; + } + i++; + ++fieldIt; + } + VLOG(3) << ss.str(); + EXPECT_EQ(fieldIt, it->end()); + EXPECT_EQ(10, i); + ++it; + rowNum++; + } + EXPECT_EQ(it, rsReader.end()); + EXPECT_EQ(210, rowNum); +} + +} // namespace storage +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/storage/test/QueryStatsTest.cpp b/src/storage/test/QueryStatsTest.cpp new file mode 100644 index 00000000000..c6e5c1c8d15 --- /dev/null +++ b/src/storage/test/QueryStatsTest.cpp @@ -0,0 +1,173 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "storage/test/TestUtils.h" +#include "storage/QueryStatsProcessor.h" +#include "storage/KeyUtils.h" +#include "dataman/RowSetReader.h" +#include "dataman/RowReader.h" + +namespace nebula { +namespace storage { + +void mockData(kvstore::KVStore* kv) { + for (auto partId = 0; partId < 3; partId++) { + std::vector data; + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + for (auto tagId = 3001; tagId < 3010; tagId++) { + auto key = KeyUtils::vertexKey(partId, vertexId, tagId, 0); + RowWriter writer; + for (int64_t numInt = 0; numInt < 3; numInt++) { + writer << numInt; + } + for (auto numString = 3; numString < 6; numString++) { + writer << folly::stringPrintf("tag_string_col_%d", numString); + } + auto val = writer.encode(); + data.emplace_back(std::move(key), std::move(val)); + } + // Generate 7 edges for each edgeType. + for (auto dstId = 10001; dstId <= 10007; dstId++) { + VLOG(3) << "Write part " << partId << ", vertex " << vertexId << ", dst " << dstId; + auto key = KeyUtils::edgeKey(partId, vertexId, 101, dstId, dstId - 10001, 0); + RowWriter writer(nullptr); + for (int64_t numInt = 0; numInt < 10; numInt++) { + writer << numInt; + } + for (auto numString = 10; numString < 20; numString++) { + writer << folly::stringPrintf("string_col_%d", numString); + } + auto val = writer.encode(); + data.emplace_back(std::move(key), std::move(val)); + } + } + kv->asyncMultiPut(0, partId, std::move(data), + [&](kvstore::ResultCode code, HostAddr addr) { + EXPECT_EQ(code, kvstore::ResultCode::SUCCESSED); + UNUSED(addr); + }); + } +} + +void buildRequest(cpp2::GetNeighborsRequest& req) { + req.set_space_id(0); + decltype(req.ids) tmpIds; + for (auto partId = 0; partId < 3; partId++) { + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + tmpIds[partId].push_back(vertexId); + } + } + req.set_ids(std::move(tmpIds)); + req.set_edge_type(101); + // Return tag props col_0, col_2, col_4 + decltype(req.return_columns) tmpColumns; + for (int i = 0; i < 2; i++) { + tmpColumns.emplace_back(TestUtils::propDef(cpp2::PropOwner::SOURCE, + folly::stringPrintf("tag_%d_col_%d", 3001 + i*2, i*2), + cpp2::StatType::AVG, + 3001 + i*2)); + } + // Return edge props col_0, col_2, col_4 ... col_18 + for (int i = 0; i < 5; i++) { + tmpColumns.emplace_back(TestUtils::propDef(cpp2::PropOwner::EDGE, + folly::stringPrintf("col_%d", i*2), + cpp2::StatType::SUM)); + } + req.set_return_columns(std::move(tmpColumns)); +} + +void checkResponse(const cpp2::QueryStatsResponse& resp) { + EXPECT_EQ(3, resp.codes.size()); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.codes[i].code); + } + + EXPECT_EQ(7, resp.schema.columns.size()); + CHECK_GT(resp.data.size(), 0); + auto provider = std::make_unique(resp.schema); + LOG(INFO) << "Check edge props..."; + + std::vector> expected; + expected.emplace_back("tag_3001_col_0", cpp2::SupportedType::DOUBLE, 0); + expected.emplace_back("tag_3003_col_2", cpp2::SupportedType::DOUBLE, 2); + expected.emplace_back("col_0", cpp2::SupportedType::INT, 0); + expected.emplace_back("col_2", cpp2::SupportedType::INT, 2); + expected.emplace_back("col_4", cpp2::SupportedType::INT, 4); + expected.emplace_back("col_6", cpp2::SupportedType::INT, 6); + expected.emplace_back("col_8", cpp2::SupportedType::INT, 8); + + RowReader reader(provider.get(), resp.data); + auto numFields = provider->getNumFields(0); + for (auto i = 0; i < numFields; i++) { + const auto* name = provider->getFieldName(i, 0); + const auto* ftype = provider->getFieldType(i, 0); + EXPECT_EQ(name, std::get<0>(expected[i])); + EXPECT_TRUE(ftype->type == std::get<1>(expected[i])); + switch(ftype->type) { + case cpp2::SupportedType::INT: { + int64_t v; + auto ret = reader.getInt(i, v); + EXPECT_EQ(ret, ResultType::SUCCEEDED); + EXPECT_EQ(std::get<2>(expected[i]) * 210 , v); + break; + } + case cpp2::SupportedType::DOUBLE: { + float v; + auto ret = reader.getFloat(i, v); + EXPECT_EQ(ret, ResultType::SUCCEEDED); + EXPECT_EQ(std::get<2>(expected[i]) , v); + break; + } + default: { + LOG(FATAL) << "Should not reach here!"; + break; + } + } + } + +} + +TEST(QueryStatsTest, StatsSimpleTest) { + fs::TempDir rootPath("/tmp/QueryStatsTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + LOG(INFO) << "Prepare meta..."; + std::unique_ptr schemaMan(new meta::MemorySchemaManager()); + auto& edgeSchema = schemaMan->edgeSchema(); + edgeSchema[0][101] = TestUtils::genEdgeSchemaProvider(10, 10); + auto& tagSchema = schemaMan->tagSchema(); + for (auto tagId = 3001; tagId < 3010; tagId++) { + tagSchema[0][tagId] = TestUtils::genTagSchemaProvider(tagId, 3, 3); + } + CHECK_NOTNULL(edgeSchema[0][101]); + mockData(kv.get()); + + cpp2::GetNeighborsRequest req; + buildRequest(req); + + auto* processor = QueryStatsProcessor::instance(kv.get(), schemaMan.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + checkResponse(resp); +} + +} // namespace storage +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/storage/test/QueryVertexPropsTest.cpp b/src/storage/test/QueryVertexPropsTest.cpp new file mode 100644 index 00000000000..a8a340653df --- /dev/null +++ b/src/storage/test/QueryVertexPropsTest.cpp @@ -0,0 +1,115 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "storage/test/TestUtils.h" +#include "storage/QueryVertexPropsProcessor.h" +#include "storage/KeyUtils.h" +#include "dataman/RowSetReader.h" +#include "dataman/RowReader.h" + +namespace nebula { +namespace storage { + +TEST(QueryVertexPropsTest, SimpleTest) { + fs::TempDir rootPath("/tmp/QueryVertexPropsTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + LOG(INFO) << "Prepare meta..."; + std::unique_ptr schemaMan(new meta::MemorySchemaManager()); + auto& tagSchema = schemaMan->tagSchema(); + for (auto tagId = 3001; tagId < 3010; tagId++) { + tagSchema[0][tagId] = TestUtils::genTagSchemaProvider(tagId, 3, 3); + } + + LOG(INFO) << "Prepare data..."; + for (auto partId = 0; partId < 3; partId++) { + std::vector data; + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + for (auto tagId = 3001; tagId < 3010; tagId++) { + auto key = KeyUtils::vertexKey(partId, vertexId, tagId, 0); + RowWriter writer; + for (int64_t numInt = 0; numInt < 3; numInt++) { + writer << numInt; + } + for (auto numString = 3; numString < 6; numString++) { + writer << folly::stringPrintf("tag_string_col_%d", numString); + } + auto val = writer.encode(); + data.emplace_back(std::move(key), std::move(val)); + } + } + kv->asyncMultiPut(0, partId, std::move(data), [&](kvstore::ResultCode code, HostAddr addr) { + EXPECT_EQ(code, kvstore::ResultCode::SUCCESSED); + UNUSED(addr); + }); + } + + LOG(INFO) << "Build VertexPropsRequest..."; + cpp2::VertexPropRequest req; + req.set_space_id(0); + decltype(req.ids) tmpIds; + for (auto partId = 0; partId < 3; partId++) { + for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { + tmpIds[partId].push_back(vertexId); + } + } + req.set_ids(std::move(tmpIds)); + // Return tag props col_0, col_2, col_4 + decltype(req.return_columns) tmpColumns; + for (int i = 0; i < 3; i++) { + tmpColumns.emplace_back(TestUtils::propDef(cpp2::PropOwner::SOURCE, + folly::stringPrintf("tag_%d_col_%d", 3001 + i*2, i*2), + 3001 + i*2)); + } + req.set_return_columns(std::move(tmpColumns)); + + LOG(INFO) << "Test QueryVertexPropsRequest..."; + auto* processor = QueryVertexPropsProcessor::instance(kv.get(), schemaMan.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + LOG(INFO) << "Check the results..."; + EXPECT_EQ(3, resp.codes.size()); + for (auto i = 0; i < 3; i++) { + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.codes[i].code); + } + + EXPECT_EQ(3, resp.vertex_schema.columns.size()); + auto tagProvider = std::make_unique(resp.vertex_schema); + EXPECT_EQ(30, resp.vertices.size()); + for (auto& vp : resp.vertices) { + RowReader tagReader(tagProvider.get(), vp.vertex_data); + EXPECT_EQ(3, tagReader.numFields()); + int64_t col1; + EXPECT_EQ(ResultType::SUCCEEDED, tagReader.getInt("tag_3001_col_0", col1)); + EXPECT_EQ(col1, 0); + + int64_t col2; + EXPECT_EQ(ResultType::SUCCEEDED, tagReader.getInt("tag_3003_col_2", col2)); + EXPECT_EQ(col2, 2); + + folly::StringPiece col3; + EXPECT_EQ(ResultType::SUCCEEDED, tagReader.getString("tag_3005_col_4", col3)); + EXPECT_EQ(folly::stringPrintf("tag_string_col_4"), col3); + } +} + +} // namespace storage +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} + + diff --git a/src/storage/test/StorageServiceHandlerTest.cpp b/src/storage/test/StorageServiceHandlerTest.cpp index 3c57ec9e87d..3b28848cc4c 100644 --- a/src/storage/test/StorageServiceHandlerTest.cpp +++ b/src/storage/test/StorageServiceHandlerTest.cpp @@ -7,7 +7,7 @@ #include "base/Base.h" #include #include "fs/TempDir.h" -#include "storage/test/StorageTestBase.h" +#include "storage/test/TestUtils.h" #include "storage/AddVerticesProcessor.h" #include "storage/StorageServiceHandler.h" #include "storage/KeyUtils.h" @@ -19,7 +19,6 @@ TEST(StorageServiceHandlerTest, FutureAddVerticesTest) { fs::TempDir rootPath("/tmp/FutureAddVerticesTest.XXXXXX"); cpp2::AddVerticesRequest req; - std::unique_ptr storageServiceHandler; req.set_space_id(0); req.overwritable = true; @@ -28,9 +27,9 @@ TEST(StorageServiceHandlerTest, FutureAddVerticesTest) { req.vertices.emplace(1, TestUtils::setupVertices(1,20,30)); LOG(INFO) << "Test FutureAddVerticesTest..."; - storageServiceHandler = std::make_unique(); std::unique_ptr kvstore(TestUtils::initKV(rootPath.path())); - storageServiceHandler->kvstore_ = kvstore.get(); + auto storageServiceHandler = std::make_unique(kvstore.get(), nullptr); + auto resp = storageServiceHandler->future_addVertices(req).get(); EXPECT_EQ(typeid(cpp2::ExecResponse).name() , typeid(resp).name()); diff --git a/src/storage/test/StorageTestBase.cpp b/src/storage/test/StorageTestBase.cpp deleted file mode 100644 index 4780aff2177..00000000000 --- a/src/storage/test/StorageTestBase.cpp +++ /dev/null @@ -1,14 +0,0 @@ -/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved - * - * This source code is licensed under Apache 2.0 License - * (found in the LICENSE.Apache file in the root directory) - */ - -#include "storage/test/StorageTestBase.h" - -namespace nebula { -namespace storage { - -} // namespace storage -} // namespace nebula - diff --git a/src/storage/test/StorageTestBase.h b/src/storage/test/StorageTestBase.h deleted file mode 100644 index 572b5e849bb..00000000000 --- a/src/storage/test/StorageTestBase.h +++ /dev/null @@ -1,67 +0,0 @@ -/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved - * - * This source code is licensed under Apache 2.0 License - * (found in the LICENSE.Apache file in the root directory) - */ - -#include "base/Base.h" -#include "kvstore/include/KVStore.h" -#include "kvstore/PartManager.h" -#include "kvstore/KVStoreImpl.h" -#include "storage/AddVerticesProcessor.h" - -DECLARE_string(part_man_type); - -namespace nebula { -namespace storage { - -class TestUtils { -public: - static kvstore::KVStore* initKV(const char* rootPath) { - FLAGS_part_man_type = "memory"; // Use MemPartManager. - using namespace kvstore; - MemPartManager* partMan = reinterpret_cast(PartManager::instance()); - // GraphSpaceID => {PartitionIDs} - // 0 => {0, 1, 2, 3, 4, 5} - auto& partsMap = partMan->partsMap(); - for (auto partId = 0; partId < 6; partId++) { - partsMap[0][partId] = PartMeta(); - } - auto dataPath = folly::stringPrintf("%s/disk1, %s/disk2", rootPath, rootPath); - std::vector paths; - paths.push_back(folly::stringPrintf("%s/disk1", rootPath)); - paths.push_back(folly::stringPrintf("%s/disk2", rootPath)); - KVStoreImpl* kv = reinterpret_cast(KVStore::instance(HostAddr(0, 0), - std::move(paths))); - return kv; - } - - static std::vector setupVertices( - const PartitionID partitionID, - const int64_t verticesNum, - const int32_t tagsNum) { - - // partId => List - // Vertex => {Id, List} - // VertexProp => {tagId, tags} - std::vector vertices; - VertexID vertexID = 0; - while (vertexID < verticesNum){ - TagID tagID = 0; - std::vector tags; - while (tagID < tagsNum){ - tags.emplace_back(apache::thrift::FragileConstructor::FRAGILE, - tagID, - folly::stringPrintf("%d_%ld_%d", partitionID, vertexID, tagID++)); - } - vertices.emplace_back(apache::thrift::FragileConstructor::FRAGILE, - vertexID++, - std::move(tags)); - } - return vertices; - } -}; - -} // namespace storage -} // namespace nebula - diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h new file mode 100644 index 00000000000..f839eec9018 --- /dev/null +++ b/src/storage/test/TestUtils.h @@ -0,0 +1,125 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include "kvstore/include/KVStore.h" +#include "kvstore/PartManager.h" +#include "kvstore/KVStoreImpl.h" +#include "meta/SchemaManager.h" +#include "dataman/ResultSchemaProvider.h" + +DECLARE_string(part_man_type); + +namespace nebula { +namespace storage { + +class TestUtils { +public: + static kvstore::KVStore* initKV(const char* rootPath) { + FLAGS_part_man_type = "memory"; // Use MemPartManager. + using namespace kvstore; + MemPartManager* partMan = reinterpret_cast(PartManager::instance()); + // GraphSpaceID => {PartitionIDs} + // 0 => {0, 1, 2, 3, 4, 5} + auto& partsMap = partMan->partsMap(); + for (auto partId = 0; partId < 6; partId++) { + partsMap[0][partId] = PartMeta(); + } + auto dataPath = folly::stringPrintf("%s/disk1, %s/disk2", rootPath, rootPath); + std::vector paths; + paths.push_back(folly::stringPrintf("%s/disk1", rootPath)); + paths.push_back(folly::stringPrintf("%s/disk2", rootPath)); + KVStoreImpl* kv = reinterpret_cast(KVStore::instance(HostAddr(0, 0), + std::move(paths))); + return kv; + } + + static std::vector setupVertices( + const PartitionID partitionID, + const int64_t verticesNum, + const int32_t tagsNum) { + // partId => List + // Vertex => {Id, List} + // VertexProp => {tagId, tags} + std::vector vertices; + VertexID vertexID = 0; + while (vertexID < verticesNum){ + TagID tagID = 0; + std::vector tags; + while (tagID < tagsNum){ + tags.emplace_back(apache::thrift::FragileConstructor::FRAGILE, + tagID, + folly::stringPrintf("%d_%ld_%d", partitionID, vertexID, tagID++)); + } + vertices.emplace_back(apache::thrift::FragileConstructor::FRAGILE, + vertexID++, + std::move(tags)); + } + return vertices; + } + /** + * It will generate SchemaProvider with some int fields and string fields + * */ + static SchemaProviderIf* genEdgeSchemaProvider(int32_t intFieldsNum, + int32_t stringFieldsNum) { + cpp2::Schema schema; + for (auto i = 0; i < intFieldsNum; i++) { + cpp2::ColumnDef column; + column.name = folly::stringPrintf("col_%d", i); + column.type.type = cpp2::SupportedType::INT; + schema.columns.emplace_back(std::move(column)); + } + for (auto i = intFieldsNum; i < intFieldsNum + stringFieldsNum; i++) { + cpp2::ColumnDef column; + column.name = folly::stringPrintf("col_%d", i); + column.type.type = cpp2::SupportedType::STRING; + schema.columns.emplace_back(std::move(column)); + } + return new ResultSchemaProvider(std::move(schema)); + } + + /** + * It will generate tag SchemaProvider with some int fields and string fields + * */ + static SchemaProviderIf* genTagSchemaProvider(TagID tagId, + int32_t intFieldsNum, + int32_t stringFieldsNum) { + cpp2::Schema schema; + for (auto i = 0; i < intFieldsNum; i++) { + cpp2::ColumnDef column; + column.name = folly::stringPrintf("tag_%d_col_%d", tagId, i); + column.type.type = cpp2::SupportedType::INT; + schema.columns.emplace_back(std::move(column)); + } + for (auto i = intFieldsNum; i < intFieldsNum + stringFieldsNum; i++) { + cpp2::ColumnDef column; + column.name = folly::stringPrintf("tag_%d_col_%d", tagId, i); + column.type.type = cpp2::SupportedType::STRING; + schema.columns.emplace_back(std::move(column)); + } + return new ResultSchemaProvider(std::move(schema)); + } + + static cpp2::PropDef propDef(cpp2::PropOwner owner, std::string name, TagID tagId = -1) { + cpp2::PropDef prop; + prop.set_name(std::move(name)); + prop.set_owner(owner); + if (tagId != -1) { + prop.set_tag_id(tagId); + } + return prop; + } + + static cpp2::PropDef propDef(cpp2::PropOwner owner, std::string name, cpp2::StatType type, TagID tagId = -1) { + auto prop = TestUtils::propDef(owner, name, tagId); + prop.set_stat(type); + return prop; + } +}; + +} // namespace storage +} // namespace nebula + From 795218c2724b0c3441b6a2db15efbf26ce672266 Mon Sep 17 00:00:00 2001 From: Sherman The Tank <5414276+sherman-the-tank@users.noreply.github.com> Date: Sun, 13 Jan 2019 07:40:18 -0500 Subject: [PATCH 2/3] Added CAS support in raft (#49) * Added CAS support in raft --- src/raftex/Host.cpp | 8 + src/raftex/RaftPart.cpp | 270 +++++++++++++++----- src/raftex/RaftPart.h | 125 +++++++++- src/raftex/test/CMakeLists.txt | 38 +++ src/raftex/test/LeaderElectionTest.cpp | 2 +- src/raftex/test/LogAppendTest.cpp | 13 +- src/raftex/test/LogCASTest.cpp | 327 +++++++++++++++++++++++++ src/raftex/test/RaftexTestBase.h | 33 +++ src/raftex/test/TestShard.cpp | 10 + src/raftex/test/TestShard.h | 1 + 10 files changed, 743 insertions(+), 84 deletions(-) create mode 100644 src/raftex/test/LogCASTest.cpp diff --git a/src/raftex/Host.cpp b/src/raftex/Host.cpp index 24832444fa9..e24899f1596 100644 --- a/src/raftex/Host.cpp +++ b/src/raftex/Host.cpp @@ -79,6 +79,14 @@ folly::Future Host::appendLogs( LogID lastLogIdSent) { VLOG(3) << idStr_ << "Entering Host::appendLogs()"; + VLOG(2) << idStr_ + << "Append logs to the host [term = " << term + << ", logId = " << logId + << ", committedLogId = " << committedLogId + << ", lastLogTermSent = " << lastLogTermSent + << ", lastLogIdSent = " << lastLogIdSent + << "]"; + auto ret = folly::Future::makeEmpty(); std::shared_ptr req; { diff --git a/src/raftex/RaftPart.cpp b/src/raftex/RaftPart.cpp index 57b74fab670..a5f80a41d71 100644 --- a/src/raftex/RaftPart.cpp +++ b/src/raftex/RaftPart.cpp @@ -39,27 +39,81 @@ using namespace nebula::wal; class AppendLogsIterator final : public LogIterator { public: - AppendLogsIterator( - LogID firstLogId, - const std::vector< - std::tuple - >& logs) + AppendLogsIterator(LogID firstLogId, + RaftPart::LogCache logs, + std::function casCB) : firstLogId_(firstLogId) - , logs_(logs) { + , logId_(firstLogId) + , logs_(std::move(logs)) + , casCB_(std::move(casCB)) { + leadByCAS_ = processCAS(); + valid_ = idx_ < logs_.size(); + hasLogs_ = !leadByCAS_ && valid_; + } + + AppendLogsIterator(const AppendLogsIterator&) = delete; + AppendLogsIterator(AppendLogsIterator&&) = default; + + AppendLogsIterator& operator=(const AppendLogsIterator&) = delete; + AppendLogsIterator& operator=(AppendLogsIterator&&) = default; + + bool leadByCAS() const { + return leadByCAS_; + } + + bool hasLogs() const { + return hasLogs_; + } + + LogID firstLogId() const { + return firstLogId_; + } + + // Return true if the current log is a CAS, otherwise return false + bool processCAS() { + while (idx_ < logs_.size()) { + auto& tup = logs_.at(idx_); + bool isCAS = std::get<2>(tup); + if (!isCAS) { + // Not a CAS + return false; + } + + // Process CAS log + CHECK(!!casCB_); + casResult_ = casCB_(std::get<3>(tup)); + if (casResult_.size() > 0) { + // CAS Succeeded + return true; + } else { + // CAS failed, move to the next log, but do not increment the logId_ + ++idx_; + } + } + + // Reached the end + return false; } LogIterator& operator++() override { ++idx_; + ++logId_; + valid_ = (idx_ < logs_.size()) && !isCAS(); + if (valid_) { + hasLogs_ = true; + } return *this; } + // The iterator becomes invalid when exausting the logs + // **OR** running into a CAS log bool valid() const override { - return idx_ < logs_.size(); + return valid_; } LogID logId() const override { DCHECK(valid()); - return firstLogId_ + idx_; + return logId_; } TermID logTerm() const override { @@ -72,17 +126,45 @@ class AppendLogsIterator final : public LogIterator { return std::get<0>(logs_.at(idx_)); } - folly::StringPiece logMsg() const { + folly::StringPiece logMsg() const override { DCHECK(valid()); + if (isCAS()) { + return casResult_; + } else { + return std::get<3>(logs_.at(idx_)); + } + } + + // Return true when there is no more log left for processing + bool empty() const { + return idx_ >= logs_.size(); + } + + // Resume the iterator so that we can continue to process the remaining logs + void resume() { + CHECK(!valid_); + if (!empty()) { + leadByCAS_ = processCAS(); + valid_ = idx_ < logs_.size(); + hasLogs_ = !leadByCAS_ && valid_; + } + } + +private: + bool isCAS() const { return std::get<2>(logs_.at(idx_)); } private: size_t idx_{0}; + bool leadByCAS_{false}; + bool hasLogs_{false}; + bool valid_{true}; + std::string casResult_; LogID firstLogId_; - const std::vector< - std::tuple - >& logs_; + LogID logId_; + RaftPart::LogCache logs_; + std::function casCB_; }; @@ -221,15 +303,26 @@ typename RaftPart::AppendLogResult RaftPart::canAppendLogs( } -folly::Future -RaftPart::appendLogsAsync(ClusterID source, - std::vector&& logMsgs) { - CHECK_GT(logMsgs.size(), 0UL); +folly::Future RaftPart::appendAsync(ClusterID source, + std::string log) { + if (source < 0) { + source = clusterId_; + } + return appendLogAsync(source, false, std::move(log)); +} + + +folly::Future RaftPart::casAsync(std::string log) { + return appendLogAsync(clusterId_, true, std::move(log)); +} + - std::vector< - std::tuple> swappedOutLogs; - auto retFuture = - folly::Future::makeEmpty(); +folly::Future RaftPart::appendLogAsync(ClusterID source, + bool isCAS, + std::string log) { + LogCache swappedOutLogs; + LogID firstId; + auto retFuture = folly::Future::makeEmpty(); { std::lock_guard lck(raftLock_); @@ -238,15 +331,15 @@ RaftPart::appendLogsAsync(ClusterID source, if (res != AppendLogResult::SUCCEEDED) { LOG(ERROR) << idStr_ << "Cannot append logs, clean the buffer"; - cachingPromise_.setValue(res); - cachingPromise_ = folly::SharedPromise(); + cachingPromise_.setValue(std::move(res)); + cachingPromise_.reset(); logs_.clear(); return res; } VLOG(2) << idStr_ << "Checking whether buffer overflow"; - if (logs_.size() + logMsgs.size() > FLAGS_max_batch_size) { + if (logs_.size() >= FLAGS_max_batch_size) { // Buffer is full LOG(WARNING) << idStr_ << "The appendLog buffer is full." @@ -257,24 +350,28 @@ RaftPart::appendLogsAsync(ClusterID source, VLOG(2) << idStr_ << "Appending logs to the buffer"; // Append new logs to the buffer - for (auto& m : logMsgs) { - logs_.emplace_back(source, term_, std::move(m)); + DCHECK_GE(source, 0); + if (isCAS) { + logs_.emplace_back(source, term_, true, std::move(log)); + retFuture = cachingPromise_.getSingleFuture(); + } else { + logs_.emplace_back(source, term_, false, std::move(log)); + retFuture = cachingPromise_.getSharedFuture(); } if (replicatingLogs_) { - CHECK(!cachingPromise_.isFulfilled()); VLOG(2) << idStr_ << "Another AppendLogs request is ongoing," " just return"; - return cachingPromise_.getFuture(); + return retFuture; } else { // We need to send logs to all followers VLOG(2) << idStr_ << "Preparing to send AppendLog request"; replicatingLogs_ = true; sendingPromise_ = std::move(cachingPromise_); - retFuture = sendingPromise_.getFuture(); - cachingPromise_ = folly::SharedPromise(); + cachingPromise_.reset(); std::swap(swappedOutLogs, logs_); + firstId = lastLogId_ + 1; } } @@ -283,47 +380,58 @@ RaftPart::appendLogsAsync(ClusterID source, // until majority accept the logs, the leadership changes, or // the partition stops VLOG(2) << idStr_ << "Calling appendLogsInternal()"; - appendLogsInternal(std::move(swappedOutLogs)); + AppendLogsIterator it( + firstId, + std::move(swappedOutLogs), + [this] (const std::string& msg) -> std::string { + auto res = compareAndSet(msg); + if (res.empty()) { + // Failed + sendingPromise_.setOneSingleValue(AppendLogResult::E_CAS_FAILURE); + } + return res; + }); + appendLogsInternal(std::move(it)); return retFuture; } -void RaftPart::appendLogsInternal( - std::vector>&& logs) { - CHECK(!logs.empty()); - - LogID firstId = 0; +void RaftPart::appendLogsInternal(AppendLogsIterator iter) { TermID currTerm = 0; LogID prevLogId = 0; TermID prevLogTerm = 0; LogID committed = 0; { std::lock_guard g(raftLock_); - firstId = lastLogId_ + 1; currTerm = term_; prevLogId = lastLogId_; prevLogTerm = lastLogTerm_; committed = committedLogId_; } - LogID lastId = firstId + logs.size() - 1; - VLOG(2) << idStr_ << "Ready to append logs from id " - << firstId << " to " << lastId - << " (Current term is " << currTerm << ")"; + if (iter.valid()) { + VLOG(2) << idStr_ << "Ready to append logs from id " + << iter.logId() << " (Current term is " + << currTerm << ")"; + } else { + VLOG(2) << idStr_ << "Ready to send a heartbeat"; + } // Step 1: Write WAL - AppendLogsIterator it(firstId, std::move(logs)); - if (!wal_->appendLogs(it)) { + if (!wal_->appendLogs(iter)) { LOG(ERROR) << idStr_ << "Failed to write into WAL"; sendingPromise_.setValue(AppendLogResult::E_WAL_FAILURE); return; } - VLOG(2) << idStr_ << "Succeeded in writing logs to WAL"; + LogID lastId = wal_->lastLogId(); + VLOG(2) << idStr_ << "Succeeded writing logs [" + << iter.firstLogId() << ", " << lastId << "] to WAL"; // Step 2: Replicate to followers auto eb = ioThreadPool_->getEventBase(); replicateLogs(eb, + std::move(iter), currTerm, lastId, committed, @@ -337,6 +445,7 @@ void RaftPart::appendLogsInternal( folly::Future RaftPart::replicateLogs( folly::EventBase* eb, + AppendLogsIterator iter, TermID currTerm, LogID lastLogId, LogID committedId, @@ -407,16 +516,18 @@ RaftPart::replicateLogs( }) .then(eb, [self = shared_from_this(), eb, + it = std::move(iter), currTerm, lastLogId, committedId, prevLogId, - prevLogTerm] (folly::Try&& result) { + prevLogTerm] (folly::Try&& result) mutable { VLOG(2) << self->idStr_ << "Received enough response"; CHECK(!result.hasException()); self->processAppendLogResponses(*result, eb, + std::move(it), currTerm, lastLogId, committedId, @@ -431,6 +542,7 @@ RaftPart::replicateLogs( void RaftPart::processAppendLogResponses( const AppendLogResponses& resps, folly::EventBase* eb, + AppendLogsIterator iter, TermID currTerm, LogID lastLogId, LogID committedId, @@ -461,8 +573,6 @@ void RaftPart::processAppendLogResponses( walIt = wal_->iterator(committedId + 1, lastLogId); } - decltype(logs_) swappedOutLogs; - // Step 3: Commit the batch if (commitLogs(std::move(walIt))) { std::lock_guard g(raftLock_); @@ -470,33 +580,53 @@ void RaftPart::processAppendLogResponses( committedLogId_ = lastLogId; // Step 4: Fulfill the promise - AppendLogResult res = AppendLogResult::SUCCEEDED; - sendingPromise_.setValue(std::move(res)); + if (iter.hasLogs()) { + sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED); + } + if (iter.leadByCAS()) { + sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED); + } // Step 5: Check whether need to continue // the log replication CHECK(replicatingLogs_); - if (logs_.size() > 0) { - // continue to replicate the logs - sendingPromise_ = std::move(cachingPromise_); - cachingPromise_ = - folly::SharedPromise(); - std::swap(swappedOutLogs, logs_); - } else { - replicatingLogs_ = false; + // Continue to process the original AppendLogsIterator if necessary + iter.resume(); + if (iter.empty()) { + if (logs_.size() > 0) { + // continue to replicate the logs + sendingPromise_ = std::move(cachingPromise_); + cachingPromise_.reset(); + iter = AppendLogsIterator( + lastLogId_ + 1, + std::move(logs_), + [this] (const std::string& log) -> std::string { + auto res = compareAndSet(log); + if (res.empty()) { + // Failed + sendingPromise_.setOneSingleValue( + AppendLogResult::E_CAS_FAILURE); + } + return res; + }); + logs_.clear(); + } else { + replicatingLogs_ = false; + } } } else { LOG(FATAL) << idStr_ << "Failed to commit logs"; } - if (!swappedOutLogs.empty()) { - appendLogsInternal(std::move(swappedOutLogs)); + if (!iter.empty()) { + appendLogsInternal(std::move(iter)); } } else { // Not enough hosts accepted the log, re-try LOG(WARNING) << idStr_ << "Only " << numSucceeded << " hosts succeeded, Need to try again"; replicateLogs(eb, + std::move(iter), currTerm, lastLogId, committedId, @@ -889,7 +1019,7 @@ void RaftPart::processAppendLogRequest( // Check the last log CHECK_GE(req.get_last_log_id_sent(), committedLogId_); - if (req.get_last_log_term_sent() != lastLogTerm_) { + if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { VLOG(2) << idStr_ << "The local last log term is " << lastLogTerm_ << ", which is different from the leader's" @@ -1101,22 +1231,34 @@ folly::Future RaftPart::sendHeartbeat() { CHECK(!result.hasException()); decltype(self->logs_) swappedOutLogs; + LogID firstId; { std::lock_guard g(self->raftLock_); CHECK(self->replicatingLogs_); if (self->logs_.size() > 0) { // continue to replicate the logs - self->sendingPromise_ = - std::move(self->cachingPromise_); - self->cachingPromise_ = - folly::SharedPromise(); + self->sendingPromise_= std::move(self->cachingPromise_); + self->cachingPromise_.reset(); std::swap(swappedOutLogs, self->logs_); + firstId = lastLogId_ + 1; } else { self->replicatingLogs_ = false; } } if (!swappedOutLogs.empty()) { - self->appendLogsInternal(std::move(swappedOutLogs)); + AppendLogsIterator it( + firstId, + std::move(swappedOutLogs), + [this] (const std::string& msg) -> std::string { + auto res = compareAndSet(msg); + if (res.empty()) { + // Failed + sendingPromise_.setOneSingleValue( + AppendLogResult::E_CAS_FAILURE); + } + return res; + }); + self->appendLogsInternal(std::move(it)); } return AppendLogResult::SUCCEEDED; }); diff --git a/src/raftex/RaftPart.h b/src/raftex/RaftPart.h index 43447f2bcd9..6d8d071dcaf 100644 --- a/src/raftex/RaftPart.h +++ b/src/raftex/RaftPart.h @@ -30,16 +30,19 @@ class BufferFlusher; namespace raftex { class Host; +class AppendLogsIterator; class RaftPart : public std::enable_shared_from_this { + friend class AppendLogsIterator; public: enum class AppendLogResult { SUCCEEDED = 0, - E_NOT_A_LEADER = -1, - E_STOPPED = -2, - E_NOT_READY = -3, - E_BUFFER_OVERFLOW = -4, - E_WAL_FAILURE = -5, + E_CAS_FAILURE = -1, + E_NOT_A_LEADER = -2, + E_STOPPED = -3, + E_NOT_READY = -4, + E_BUFFER_OVERFLOW = -5, + E_WAL_FAILURE = -6, }; @@ -113,9 +116,12 @@ class RaftPart : public std::enable_shared_from_this { * * If the source == -1, the current clusterId will be used ****************************************************************/ - folly::Future appendLogsAsync( - ClusterID source, - std::vector&& logMsgs); + folly::Future appendAsync(ClusterID source, std::string log); + + /**************************************************************** + * Asynchronously compare and set + ***************************************************************/ + folly::Future casAsync(std::string log); /***************************************************** * @@ -165,6 +171,18 @@ class RaftPart : public std::enable_shared_from_this { // a new leader virtual void onElected(TermID term) = 0; + // This method is invoked when handling a CAS log. The inherited + // class uses this method to do the comparison and decide whether + // a log should be inserted + // + // The method will be guaranteed to execute in a single-threaded + // manner, so no need for locks + // + // If CAS succeeded, the method should return the correct log content + // that will be applied to the storage. Otherwise it returns an empty + // string + virtual std::string compareAndSet(const std::string& log) = 0; + // The inherited classes need to implement this method to commit // a batch of log messages virtual bool commitLogs(std::unique_ptr iter) = 0; @@ -192,6 +210,13 @@ class RaftPart : public std::enable_shared_from_this { // resp -- AppendLogResponse using AppendLogResponses = std::vector; + // + using LogCache = std::vector< + std::tuple>; + /**************************************************** * @@ -238,11 +263,15 @@ class RaftPart : public std::enable_shared_from_this { // Pre-condition: The caller needs to hold the raftLock_ AppendLogResult canAppendLogs(std::lock_guard& lck); - void appendLogsInternal( - std::vector>&& logs); + folly::Future appendLogAsync(ClusterID source, + bool isCAS, + std::string log); + + void appendLogsInternal(AppendLogsIterator iter); folly::Future replicateLogs( folly::EventBase* eb, + AppendLogsIterator iter, TermID currTerm, LogID lastLogId, LogID committedId, @@ -252,6 +281,7 @@ class RaftPart : public std::enable_shared_from_this { void processAppendLogResponses( const AppendLogResponses& resps, folly::EventBase* eb, + AppendLogsIterator iter, TermID currTerm, LogID lastLogId, LogID committedId, @@ -260,6 +290,75 @@ class RaftPart : public std::enable_shared_from_this { private: + template + class PromiseSet final { + public: + PromiseSet() = default; + PromiseSet(const PromiseSet&) = delete; + PromiseSet(PromiseSet&&) = default; + + ~PromiseSet() = default; + + PromiseSet& operator=(const PromiseSet&) = delete; + PromiseSet& operator=(PromiseSet&& right) = default; + + void reset() { + sharedPromises_.clear(); + singlePromises_.clear(); + rollSharedPromise_ = true; + } + + folly::Future getSharedFuture() { + if (rollSharedPromise_) { + sharedPromises_.emplace_back(); + rollSharedPromise_ = false; + } + + return sharedPromises_.back().getFuture(); + } + + folly::Future getSingleFuture() { + singlePromises_.emplace_back(); + rollSharedPromise_ = true; + + return singlePromises_.back().getFuture(); + } + + template + void setOneSharedValue(VT&& val) { + CHECK(!sharedPromises_.empty()); + sharedPromises_.front().setValue(std::forward(val)); + sharedPromises_.pop_front(); + } + + template + void setOneSingleValue(VT&& val) { + CHECK(!singlePromises_.empty()); + singlePromises_.front().setValue(std::forward(val)); + singlePromises_.pop_front(); + } + + void setValue(ValueType val) { + for (auto& p : sharedPromises_) { + p.setValue(val); + } + for (auto& p : singlePromises_) { + p.setValue(val); + } + } + + + private: + // Whether the last future was returned from a shared promise + bool rollSharedPromise_{true}; + + // Promises shared by continuous non-CAS logs + std::list> sharedPromises_; + // A list of promises for CAS logs + std::list> singlePromises_; + }; + + const std::string idStr_; const ClusterID clusterId_; @@ -274,9 +373,9 @@ class RaftPart : public std::enable_shared_from_this { mutable std::mutex raftLock_; bool replicatingLogs_{false}; - folly::SharedPromise cachingPromise_; - folly::SharedPromise sendingPromise_; - std::vector> logs_; + PromiseSet cachingPromise_; + PromiseSet sendingPromise_; + LogCache logs_; Status status_; Role role_; diff --git a/src/raftex/test/CMakeLists.txt b/src/raftex/test/CMakeLists.txt index a4e141e93bb..fdac26b0916 100644 --- a/src/raftex/test/CMakeLists.txt +++ b/src/raftex/test/CMakeLists.txt @@ -73,3 +73,41 @@ target_link_libraries( ) add_test(NAME log_append_test COMMAND log_append_test) + +add_executable( + log_cas_test + LogCASTest.cpp + RaftexTestBase.cpp + TestShard.cpp + $ + $ + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + log_cas_test + ${THRIFT_LIBRARIES} + wangle + folly + gtest + glog + gflags + boost_context + boost_system + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + event + -pthread +) +add_test(NAME log_cas_test COMMAND log_cas_test) + diff --git a/src/raftex/test/LeaderElectionTest.cpp b/src/raftex/test/LeaderElectionTest.cpp index e8b1e258a62..88d96669b35 100644 --- a/src/raftex/test/LeaderElectionTest.cpp +++ b/src/raftex/test/LeaderElectionTest.cpp @@ -43,7 +43,7 @@ TEST(LeaderElection, ElectionAfterBoot) { TEST(LeaderElection, LeaderCrash) { LOG(INFO) << "=====> Start LeaderCrash test"; - fs::TempDir walRoot("/tmp/leader_election.XXXXXX"); + fs::TempDir walRoot("/tmp/leader_crash.XXXXXX"); std::shared_ptr workers; std::vector wals; std::vector allHosts; diff --git a/src/raftex/test/LogAppendTest.cpp b/src/raftex/test/LogAppendTest.cpp index 825a2076306..a74ce647a56 100644 --- a/src/raftex/test/LogAppendTest.cpp +++ b/src/raftex/test/LogAppendTest.cpp @@ -23,7 +23,7 @@ namespace nebula { namespace raftex { TEST(LogAppend, SimpleAppend) { - fs::TempDir walRoot("/tmp/election_after_boot.XXXXXX"); + fs::TempDir walRoot("/tmp/simple_append.XXXXXX"); std::shared_ptr workers; std::vector wals; std::vector allHosts; @@ -42,13 +42,13 @@ TEST(LogAppend, SimpleAppend) { for (int i = 1; i <= 100; ++i) { msgs.emplace_back( folly::stringPrintf("Test Log Message %03d", i)); - auto fut = leader->appendLogsAsync(0, {msgs.back()}); + auto fut = leader->appendAsync(0, msgs.back()); ASSERT_EQ(RaftPart::AppendLogResult::SUCCEEDED, std::move(fut).get()); } LOG(INFO) << "<===== Finish appending logs"; - // Sleep a while to make sure the lat log has been committed on + // Sleep a while to make sure the last log has been committed on // followers sleep(FLAGS_heartbeat_interval); @@ -71,7 +71,7 @@ TEST(LogAppend, SimpleAppend) { TEST(LogAppend, MultiThreadAppend) { - fs::TempDir walRoot("/tmp/election_after_boot.XXXXXX"); + fs::TempDir walRoot("/tmp/multi_thread_append.XXXXXX"); std::shared_ptr workers; std::vector wals; std::vector allHosts; @@ -93,8 +93,8 @@ TEST(LogAppend, MultiThreadAppend) { threads.emplace_back(std::thread([i, numLogs, leader] { for (int j = 1; j <= numLogs; ++j) { do { - auto fut = leader->appendLogsAsync( - 0, {folly::stringPrintf("Log %03d for t%d", j, i)}); + auto fut = leader->appendAsync( + 0, folly::stringPrintf("Log %03d for t%d", j, i)); if (fut.isReady() && fut.value() == RaftPart::AppendLogResult::E_BUFFER_OVERFLOW) { // Buffer overflow, while a little @@ -142,6 +142,7 @@ TEST(LogAppend, MultiThreadAppend) { finishRaft(services, copies, workers, leader); } + } // namespace raftex } // namespace nebula diff --git a/src/raftex/test/LogCASTest.cpp b/src/raftex/test/LogCASTest.cpp new file mode 100644 index 00000000000..4a7cd65e525 --- /dev/null +++ b/src/raftex/test/LogCASTest.cpp @@ -0,0 +1,327 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "wal/BufferFlusher.h" +#include "raftex/RaftexService.h" +#include "raftex/test/RaftexTestBase.h" +#include "raftex/test/TestShard.h" + +DECLARE_uint32(heartbeat_interval); + + +namespace nebula { +namespace raftex { + +class LogCASTest : public RaftexTestFixture { +public: + LogCASTest() : RaftexTestFixture("log_cas_test") {} +}; + + +TEST_F(LogCASTest, StartWithValidCAS) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + leader_->casAsync("TCAS Log Message"); + msgs.emplace_back("CAS Log Message"); + for (int i = 2; i <= 10; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader_->appendAsync(0, msgs.back()); + if (i == 10) { + fut.wait(); + } + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(10, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 10; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, StartWithInvalidCAS) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + leader_->casAsync("FCAS Log Message"); + for (int i = 1; i <= 10; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader_->appendAsync(0, msgs.back()); + if (i == 10) { + fut.wait(); + } + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(10, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 10; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, ValidCASInMiddle) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 5; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader_->appendAsync(0, msgs.back()); + } + + leader_->casAsync("TCAS Log Message"); + msgs.emplace_back("CAS Log Message"); + + for (int i = 7; i <= 10; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader_->appendAsync(0, msgs.back()); + if (i == 10) { + fut.wait(); + } + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(10, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 10; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, InvalidCASInMiddle) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 5; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader_->appendAsync(0, msgs.back()); + } + + leader_->casAsync("FCAS Log Message"); + + for (int i = 6; i <= 10; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + auto fut = leader_->appendAsync(0, msgs.back()); + if (i == 10) { + fut.wait(); + } + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(10, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 10; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, EndWithValidCAS) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 8; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + leader_->appendAsync(0, msgs.back()); + } + leader_->casAsync("TCAS Log Message"); + msgs.emplace_back("CAS Log Message"); + auto fut = leader_->casAsync("TCAS Log Message"); + msgs.emplace_back("CAS Log Message"); + fut.wait(); + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(10, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 10; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, EndWithInvalidCAS) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 8; ++i) { + msgs.emplace_back( + folly::stringPrintf("Test Log Message %03d", i)); + leader_->appendAsync(0, msgs.back()); + } + leader_->casAsync("FCAS Log Message"); + auto fut = leader_->casAsync("FCAS Log Message"); + fut.wait(); + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(8, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 8; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, AllValidCAS) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 10; ++i) { + auto fut = leader_->casAsync("TTest CAS Log"); + msgs.emplace_back("Test CAS Log"); + if (i == 10) { + fut.wait(); + } + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(10, c->getNumLogs()); + } + + LogID id = 1; + for (int i = 0; i < 10; ++i, ++id) { + for (auto& c : copies_) { + folly::StringPiece msg; + ASSERT_TRUE(c->getLogMsg(id, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } +} + + +TEST_F(LogCASTest, AllInvalidCAS) { + // Append logs + LOG(INFO) << "=====> Start appending logs"; + std::vector msgs; + for (int i = 1; i <= 10; ++i) { + auto fut = leader_->casAsync("FCAS Log"); + if (i == 10) { + fut.wait(); + } + } + LOG(INFO) << "<===== Finish appending logs"; + + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_heartbeat_interval); + + // Check every copy + for (auto& c : copies_) { + ASSERT_EQ(0, c->getNumLogs()); + } +} + +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + using namespace nebula::raftex; + flusher = std::make_unique(); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/raftex/test/RaftexTestBase.h b/src/raftex/test/RaftexTestBase.h index 7c83255777c..adc1834997d 100644 --- a/src/raftex/test/RaftexTestBase.h +++ b/src/raftex/test/RaftexTestBase.h @@ -75,6 +75,39 @@ void finishRaft(std::vector>& services, void checkLeadership(std::vector>& copies, std::shared_ptr& leader); + +class RaftexTestFixture : public ::testing::Test { +public: + RaftexTestFixture(const std::string& testName) + : testName_(testName) {} + ~RaftexTestFixture() = default; + + void SetUp() { + walRoot_ = std::make_unique( + folly::stringPrintf("/tmp/%s.XXXXXX", testName_.c_str()).c_str()); + setupRaft(*walRoot_, workers_, wals_, allHosts_, services_, copies_, leader_); + + // Check all hosts agree on the same leader + checkLeadership(copies_, leader_); + } + + void TearDown() { + finishRaft(services_, copies_, workers_, leader_); + walRoot_.reset(); + } + +protected: + const std::string testName_; + std::unique_ptr walRoot_; + std::shared_ptr workers_; + std::vector wals_; + std::vector allHosts_; + std::vector> services_; + std::vector> copies_; + + std::shared_ptr leader_; +}; + } // namespace raftex } // namespace nebula #endif // RAFTEX_TEST_RAFTEXTESTBASE_H_ diff --git a/src/raftex/test/TestShard.cpp b/src/raftex/test/TestShard.cpp index 78d22802aed..6810e601b61 100644 --- a/src/raftex/test/TestShard.cpp +++ b/src/raftex/test/TestShard.cpp @@ -55,6 +55,16 @@ void TestShard::onElected(TermID term) { } +std::string TestShard::compareAndSet(const std::string& log) { + switch (log[0]) { + case 'T': + return log.substr(1); + default: + return std::string(); + } +} + + bool TestShard::commitLogs(std::unique_ptr iter) { VLOG(2) << "TestShard: Committing logs"; LogID firstId = -1; diff --git a/src/raftex/test/TestShard.h b/src/raftex/test/TestShard.h index 06a6fdcdff6..b5385f44697 100644 --- a/src/raftex/test/TestShard.h +++ b/src/raftex/test/TestShard.h @@ -44,6 +44,7 @@ class TestShard : public RaftPart { void onLostLeadership(TermID term) override; void onElected(TermID term) override; + std::string compareAndSet(const std::string& log) override; bool commitLogs(std::unique_ptr iter) override; size_t getNumLogs() const; From 015a731a05d8708abf36be739374a323543da8e3 Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Mon, 14 Jan 2019 09:48:51 +0800 Subject: [PATCH 3/3] Define meta service interface (#78) --- src/daemons/CMakeLists.txt | 31 +++++++++++++ src/daemons/MetaDaemon.cpp | 32 +++++++++++++ src/interface/CMakeLists.txt | 24 ++++++++++ src/interface/meta.thrift | 79 +++++++++++++++++++++++++++++++++ src/meta/CMakeLists.txt | 8 +++- src/meta/MetaServiceHandler.cpp | 40 +++++++++++++++++ src/meta/MetaServiceHandler.h | 37 +++++++++++++++ 7 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 src/daemons/MetaDaemon.cpp create mode 100644 src/interface/meta.thrift create mode 100644 src/meta/MetaServiceHandler.cpp create mode 100644 src/meta/MetaServiceHandler.h diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 20147da6f2d..d42de9d128b 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -72,3 +72,34 @@ target_link_libraries( ) +add_executable( + metad + MetaDaemon.cpp + $ + $ + $ + $ + $ + $ + $ +) +target_link_libraries( + metad + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + folly + boost_system + boost_context + ${OPENSSL_LIBRARIES} + ${KRB5_LIBRARIES} + glog + gflags + event + ${COMPRESSION_LIBRARIES} + resolv + double-conversion + dl + jemalloc + -pthread +) diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp new file mode 100644 index 00000000000..71ed9baf1e5 --- /dev/null +++ b/src/daemons/MetaDaemon.cpp @@ -0,0 +1,32 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#include "base/Base.h" +#include +#include "meta/MetaServiceHandler.h" + +DEFINE_int32(port, 45500, "Meta daemon listening port"); + + +int main(int argc, char *argv[]) { + folly::init(&argc, &argv, true); + + using namespace nebula::meta; + + LOG(INFO) << "Starting the meta Daemon on port " << FLAGS_port; + + auto handler = std::make_shared(); + auto server = std::make_shared(); + CHECK(!!server) << "Failed to create the thrift server"; + + server->setInterface(handler); + server->setPort(FLAGS_port); + + server->serve(); // Will wait until the server shuts down + + LOG(INFO) << "The storage Daemon on port " << FLAGS_port << " stopped"; +} + diff --git a/src/interface/CMakeLists.txt b/src/interface/CMakeLists.txt index 43b7967224a..58a77910066 100644 --- a/src/interface/CMakeLists.txt +++ b/src/interface/CMakeLists.txt @@ -39,6 +39,18 @@ add_custom_command( DEPENDS storage.thrift ) +add_custom_command( + OUTPUT + gen-cpp2/MetaService.cpp + gen-cpp2/MetaServiceAsyncClient.cpp + gen-cpp2/MetaService_processmap_binary.cpp + gen-cpp2/MetaService_processmap_compact.cpp + gen-cpp2/meta_constants.cpp + gen-cpp2/meta_data.cpp + gen-cpp2/meta_types.cpp + COMMAND "${CMAKE_HOME_DIRECTORY}/third-party/fbthrift/_install/bin/thrift1" "--allow-neg-enum-vals" "--templates" "${CMAKE_HOME_DIRECTORY}/third-party/fbthrift/_install/include/thrift/templates" "--gen" "mstch_cpp2:include_prefix=\"interface\",process_in_event_base,stack_arguments" "--gen" "java:hashcode" "--gen" "go" "-o" "." "./meta.thrift" + DEPENDS meta.thrift +) add_custom_target( clean-interface @@ -83,3 +95,15 @@ add_library( gen-cpp2/storage_types.cpp ) add_dependencies(storage_thrift_obj tgt_fbthrift) + +add_library( + meta_thrift_obj OBJECT + gen-cpp2/MetaService.cpp + gen-cpp2/MetaServiceAsyncClient.cpp + gen-cpp2/MetaService_processmap_binary.cpp + gen-cpp2/MetaService_processmap_compact.cpp + gen-cpp2/meta_constants.cpp + gen-cpp2/meta_data.cpp + gen-cpp2/meta_types.cpp +) +add_dependencies(meta_thrift_obj tgt_fbthrift) diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift new file mode 100644 index 00000000000..84e92831815 --- /dev/null +++ b/src/interface/meta.thrift @@ -0,0 +1,79 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +namespace cpp nebula.meta +namespace java nebula.meta +namespace go nebula.meta + +cpp_include "base/ThriftTypes.h" + +typedef i32 (cpp.type = "nebula::IPv4") IPv4 +typedef i32 (cpp.type = "nebula::Port") Port + +enum ErrorCode { + SUCCEEDED = 0, + + // RPC Failure + E_DISCONNECTED = -1, + E_FAIL_TO_CONNECT = -2, + E_RPC_FAILURE = -3, + + E_LEADER_CHANAGED = -11, + + // Operation Failure + E_NODE_HAS_EXISTED = -21, + E_NODE_NOT_EXISTED = -22, +} (cpp.enum_strict) + +struct HostAddr { + 1: IPv4 ip, + 2: Port port, +} + +struct ExecResponse { + 1: ErrorCode ret, + // Valid if ret equals E_LEADER_CHANAGED. + 2: HostAddr leader, +} + +struct CreateNodeRequest { + 1: string path, + 2: string value, +} + +struct SetNodeRequest { + 1: string path, + 2: string value, +} + +struct GetNodeRequest { + 1: string path, +} + +struct GetNodeResponse { + 1: ErrorCode ret, + 2: HostAddr leader, + 3: string value, + 4: i64 last_updated_time, +} + +struct ListChildrenRequest { + 1: string path, +} + +struct ListChildrenResponse { + 1: ErrorCode ret, + 2: HostAddr leader, + 3: list children, +} + +service MetaService { + ExecResponse createNode(1: CreateNodeRequest req); + ExecResponse setNode(1: SetNodeRequest req); + GetNodeResponse getNode(1: GetNodeRequest req); + ListChildrenResponse listChildren(1: ListChildrenRequest req); +} + diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 0ac1abcba13..c6e27b75e3d 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -6,5 +6,11 @@ add_library( ) add_dependencies(meta_obj common dataman_obj) -#add_subdirectory(test) +add_library( + meta_service_handler OBJECT + MetaServiceHandler.cpp +) +add_dependencies(meta_service_handler common meta_thrift_obj kvstore_obj) + +#add_subdirectory(test) diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp new file mode 100644 index 00000000000..00d4514fd1a --- /dev/null +++ b/src/meta/MetaServiceHandler.cpp @@ -0,0 +1,40 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "meta/MetaServiceHandler.h" + +namespace nebula { +namespace meta { + +folly::Future +MetaServiceHandler::future_createNode(const cpp2::CreateNodeRequest& req) { + UNUSED(req); + folly::Promise p; + return p.getFuture(); +} + +folly::Future +MetaServiceHandler::future_setNode(const cpp2::SetNodeRequest& req) { + UNUSED(req); + folly::Promise p; + return p.getFuture(); +} + +folly::Future +MetaServiceHandler::future_getNode(const cpp2::GetNodeRequest& req) { + UNUSED(req); + folly::Promise p; + return p.getFuture(); +} + +folly::Future +MetaServiceHandler::future_listChildren(const cpp2::ListChildrenRequest& req) { + UNUSED(req); + folly::Promise p; + return p.getFuture(); +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h new file mode 100644 index 00000000000..5ede1f00c49 --- /dev/null +++ b/src/meta/MetaServiceHandler.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ + +#ifndef META_METASERVICEHANDLER_H_ +#define META_METASERVICEHANDLER_H_ + +#include "base/Base.h" +#include "interface/gen-cpp2/MetaService.h" +#include "kvstore/include/KVStore.h" + +namespace nebula { +namespace meta { + +class MetaServiceHandler final : public cpp2::MetaServiceSvIf { +public: + folly::Future + future_createNode(const cpp2::CreateNodeRequest& req) override; + + folly::Future + future_setNode(const cpp2::SetNodeRequest& req) override; + + folly::Future + future_getNode(const cpp2::GetNodeRequest& req) override; + + folly::Future + future_listChildren(const cpp2::ListChildrenRequest& req) override; + +private: + kvstore::KVStore* kvstore_ = nullptr; +}; + +} // namespace meta +} // namespace nebula +#endif // META_METASERVICEHANDLER_H_