diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/GraphStorageClient.cpp index 84b255f1aaa..860682f44cf 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/GraphStorageClient.cpp @@ -8,14 +8,38 @@ #include "common/base/Base.h" +using nebula::storage::cpp2::ExecResponse; +using nebula::storage::cpp2::GetNeighborsResponse; +using nebula::storage::cpp2::GetPropResponse; + namespace nebula { namespace storage { -folly::SemiFuture> GraphStorageClient::getNeighbors( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, +GraphStorageClient::CommonRequestParam::CommonRequestParam(GraphSpaceID space_, + SessionID sess, + ExecutionPlanID plan_, + bool profile_, + bool toss, + bool experimental, + folly::EventBase* evb_) + : space(space_), + session(sess), + plan(plan_), + profile(profile_), + useToss(toss), + useExperimentalFeature(experimental), + evb(evb_) {} + +cpp2::RequestCommon GraphStorageClient::CommonRequestParam::toReqCommon() const { + cpp2::RequestCommon common; + common.set_session_id(session); + common.set_plan_id(plan); + common.set_profile_detail(profile); + return common; +} + +StorageRpcRespFuture GraphStorageClient::getNeighbors( + const CommonRequestParam& param, std::vector colNames, const std::vector& vertices, const std::vector& edgeTypes, @@ -28,27 +52,26 @@ folly::SemiFuture> GraphStorageCl bool random, const std::vector& orderBy, int64_t limit, - const Expression* filter, - folly::EventBase* evb) { - auto cbStatus = getIdFromRow(space, false); + const Expression* filter) { + auto cbStatus = getIdFromRow(param.space, false); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, vertices, std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, vertices, std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); } auto& clusters = status.value(); - auto common = makeRequestCommon(session, plan, profile); + auto common = param.toReqCommon(); std::unordered_map requests; for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_column_names(colNames); req.set_parts(std::move(c.second)); req.set_common(common); @@ -80,28 +103,25 @@ folly::SemiFuture> GraphStorageCl } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::GetNeighborsRequest& r) { return client->future_getNeighbors(r); }); } -folly::SemiFuture> GraphStorageClient::addVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, +StorageRpcRespFuture GraphStorageClient::addVertices( + const CommonRequestParam& param, std::vector vertices, std::unordered_map> propNames, - bool ifNotExists, - folly::EventBase* evb) { - auto cbStatus = getIdFromNewVertex(space); + bool ifNotExists) { + auto cbStatus = getIdFromNewVertex(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(vertices), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(vertices), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -109,42 +129,37 @@ folly::SemiFuture> GraphStorageClient::ad auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_if_not_exists(ifNotExists); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); req.set_common(common); } - VLOG(3) << "requests size " << requests.size(); return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddVerticesRequest& r) { return client->future_addVertices(r); }); } -folly::SemiFuture> GraphStorageClient::addEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, +StorageRpcRespFuture GraphStorageClient::addEdges( + const CommonRequestParam& param, std::vector edges, std::vector propNames, - bool ifNotExists, - folly::EventBase* evb, - bool useToss) { - auto cbStatus = getIdFromNewEdge(space); + bool ifNotExists) { + auto cbStatus = getIdFromNewEdge(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(edges), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(edges), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -152,28 +167,27 @@ folly::SemiFuture> GraphStorageClient::ad auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_if_not_exists(ifNotExists); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); req.set_common(common); } - return collectResponse( - evb, - std::move(requests), - [=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddEdgesRequest& r) { - return useToss ? client->future_chainAddEdges(r) : client->future_addEdges(r); - }); + return collectResponse(param.evb, + std::move(requests), + [useToss = param.useToss](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::AddEdgesRequest& r) { + return useToss ? client->future_chainAddEdges(r) + : client->future_addEdges(r); + }); } -folly::SemiFuture> GraphStorageClient::getProps( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, +StorageRpcRespFuture GraphStorageClient::getProps( + const CommonRequestParam& param, const DataSet& input, const std::vector* vertexProps, const std::vector* edgeProps, @@ -181,15 +195,14 @@ folly::SemiFuture> GraphStorageClient: bool dedup, const std::vector& orderBy, int64_t limit, - const Expression* filter, - folly::EventBase* evb) { - auto cbStatus = getIdFromRow(space, edgeProps != nullptr); + const Expression* filter) { + auto cbStatus = getIdFromRow(param.space, edgeProps != nullptr); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, input.rows, std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, input.rows, std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -197,11 +210,11 @@ folly::SemiFuture> GraphStorageClient: auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_dedup(dedup); if (vertexProps != nullptr) { @@ -223,25 +236,21 @@ folly::SemiFuture> GraphStorageClient: req.set_common(common); } - return collectResponse(evb, + return collectResponse(param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::GetPropRequest& r) { return client->future_getProps(r); }); } -folly::SemiFuture> GraphStorageClient::deleteEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector edges, - folly::EventBase* evb) { - auto cbStatus = getIdFromEdgeKey(space); +StorageRpcRespFuture GraphStorageClient::deleteEdges( + const CommonRequestParam& param, std::vector edges) { + auto cbStatus = getIdFromEdgeKey(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(edges), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(edges), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -249,36 +258,32 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteEdgesRequest& r) { return client->future_deleteEdges(r); }); } -folly::SemiFuture> GraphStorageClient::deleteVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector ids, - folly::EventBase* evb) { - auto cbStatus = getIdFromValue(space); +StorageRpcRespFuture GraphStorageClient::deleteVertices( + const CommonRequestParam& param, std::vector ids) { + auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(ids), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(ids), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -286,36 +291,32 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteVerticesRequest& r) { return client->future_deleteVertices(r); }); } -folly::SemiFuture> GraphStorageClient::deleteTags( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector delTags, - folly::EventBase* evb) { - auto cbStatus = getIdFromDelTags(space); +StorageRpcRespFuture GraphStorageClient::deleteTags( + const CommonRequestParam& param, std::vector delTags) { + auto cbStatus = getIdFromDelTags(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(delTags), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(delTags), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -323,17 +324,17 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteTagsRequest& r) { return client->future_deleteTags(r); @@ -341,17 +342,14 @@ folly::SemiFuture> GraphStorageClient::de } folly::Future> GraphStorageClient::updateVertex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, Value vertexId, TagID tagId, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb) { - auto cbStatus = getIdFromValue(space); + std::string condition) { + auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { return folly::makeFuture>(cbStatus.status()); } @@ -359,9 +357,9 @@ folly::Future> GraphStorageClient::updat std::pair request; DCHECK(!!metaClient_); - auto status = metaClient_->partsNum(space); + auto status = metaClient_->partsNum(param.space); if (!status.ok()) { - return Status::Error("Space not found, spaceid: %d", space); + return Status::Error("Space not found, spaceid: %d", param.space); } auto numParts = status.value(); status = metaClient_->partId(numParts, std::move(cbStatus).value()(vertexId)); @@ -370,27 +368,27 @@ folly::Future> GraphStorageClient::updat } auto part = status.value(); - auto host = this->getLeader(space, part); + auto host = this->getLeader(param.space, part); if (!host.ok()) { return folly::makeFuture>(host.status()); } request.first = std::move(host).value(); cpp2::UpdateVertexRequest req; - req.set_space_id(space); + req.set_space_id(param.space); req.set_vertex_id(vertexId); req.set_tag_id(tagId); req.set_part_id(part); req.set_updated_props(std::move(updatedProps)); req.set_return_props(std::move(returnProps)); req.set_insertable(insertable); - req.set_common(makeRequestCommon(session, plan)); + req.set_common(param.toReqCommon()); if (condition.size() > 0) { req.set_condition(std::move(condition)); } request.second = std::move(req); return getResponse( - evb, + param.evb, std::move(request), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateVertexRequest& r) { return client->future_updateVertex(r); @@ -398,16 +396,13 @@ folly::Future> GraphStorageClient::updat } folly::Future> GraphStorageClient::updateEdge( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb, - bool useExperimentalFeature) { + std::string condition) { + auto space = param.space; auto cbStatus = getIdFromEdgeKey(space); if (!cbStatus.ok()) { return folly::makeFuture>(cbStatus.status()); @@ -439,16 +434,17 @@ folly::Future> GraphStorageClient::updat req.set_updated_props(std::move(updatedProps)); req.set_return_props(std::move(returnProps)); req.set_insertable(insertable); - req.set_common(makeRequestCommon(session, plan)); + req.set_common(param.toReqCommon()); if (condition.size() > 0) { req.set_condition(std::move(condition)); } request.second = std::move(req); return getResponse( - evb, + param.evb, std::move(request), - [=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateEdgeRequest& r) { + [useExperimentalFeature = param.useExperimentalFeature]( + cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateEdgeRequest& r) { return useExperimentalFeature ? client->future_chainUpdateEdge(r) : client->future_updateEdge(r); }); @@ -488,18 +484,15 @@ folly::Future> GraphStorageClient::getUUID(GraphSpac }); } -folly::SemiFuture> GraphStorageClient::lookupIndex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, +StorageRpcRespFuture GraphStorageClient::lookupIndex( + const CommonRequestParam& param, const std::vector& contexts, bool isEdge, int32_t tagOrEdge, const std::vector& returnCols, - int64_t limit, - folly::EventBase* evb) { + int64_t limit) { // TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer. + auto space = param.space; auto status = getHostParts(space); if (!status.ok()) { return folly::makeFuture>( @@ -514,7 +507,7 @@ folly::SemiFuture> GraphStorageClient: auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan, profile); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -531,20 +524,16 @@ folly::SemiFuture> GraphStorageClient: } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::LookupIndexRequest& r) { return client->future_lookupIndex(r); }); } -folly::SemiFuture> -GraphStorageClient::lookupAndTraverse(GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - cpp2::IndexSpec indexSpec, - cpp2::TraverseSpec traverseSpec, - folly::EventBase* evb) { +StorageRpcRespFuture GraphStorageClient::lookupAndTraverse( + const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec) { + auto space = param.space; auto status = getHostParts(space); if (!status.ok()) { return folly::makeFuture>( @@ -553,7 +542,7 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space, auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -565,7 +554,7 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space, } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::LookupAndTraverseRequest& r) { return client->future_lookupAndTraverse(r); @@ -800,15 +789,6 @@ StatusOr> GraphStorageClien } return cb; } -cpp2::RequestCommon GraphStorageClient::makeRequestCommon(SessionID sessionId, - ExecutionPlanID planId, - bool profile) { - cpp2::RequestCommon common; - common.set_session_id(sessionId); - common.set_plan_id(planId); - common.set_profile_detail(profile); - return common; -} } // namespace storage } // namespace nebula diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/GraphStorageClient.h index 2135b49e64a..3234116f749 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/GraphStorageClient.h @@ -16,6 +16,9 @@ namespace nebula { namespace storage { +template +using StorageRpcRespFuture = folly::SemiFuture>; + /** * A wrapper class for GraphStorageServiceAsyncClient thrift API * @@ -24,19 +27,34 @@ namespace storage { class GraphStorageClient : public StorageClientBase { FRIEND_TEST(StorageClientTest, LeaderChangeTest); - using Parent = StorageClientBase; - public: + struct CommonRequestParam { + GraphSpaceID space; + SessionID session; + ExecutionPlanID plan; + bool profile{false}; + bool useToss{false}; + bool useExperimentalFeature{false}; + folly::EventBase* evb{nullptr}; + + CommonRequestParam(GraphSpaceID space_, + SessionID sess, + ExecutionPlanID plan_, + bool profile_ = false, + bool toss = false, + bool experimental = false, + folly::EventBase* evb_ = nullptr); + + cpp2::RequestCommon toReqCommon() const; + }; + GraphStorageClient(std::shared_ptr ioThreadPool, meta::MetaClient* metaClient) - : Parent(ioThreadPool, metaClient) {} + : StorageClientBase(ioThreadPool, metaClient) {} virtual ~GraphStorageClient() {} - folly::SemiFuture> getNeighbors( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, + StorageRpcRespFuture getNeighbors( + const CommonRequestParam& param, std::vector colNames, // The first column has to be the VertexID const std::vector& vertices, @@ -50,13 +68,10 @@ class GraphStorageClient : public StorageClientBase& orderBy = std::vector(), int64_t limit = std::numeric_limits::max(), - const Expression* filter = nullptr, - folly::EventBase* evb = nullptr); + const Expression* filter = nullptr); - folly::SemiFuture> getProps( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + StorageRpcRespFuture getProps( + const CommonRequestParam& param, const DataSet& input, const std::vector* vertexProps, const std::vector* edgeProps, @@ -64,96 +79,59 @@ class GraphStorageClient : public StorageClientBase& orderBy = std::vector(), int64_t limit = std::numeric_limits::max(), - const Expression* filter = nullptr, - folly::EventBase* evb = nullptr); + const Expression* filter = nullptr); - folly::SemiFuture> addVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + StorageRpcRespFuture addVertices( + const CommonRequestParam& param, std::vector vertices, std::unordered_map> propNames, - bool ifNotExists, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> addEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector edges, - std::vector propNames, - bool ifNotExists, - folly::EventBase* evb = nullptr, - bool useToss = false); - - folly::SemiFuture> deleteEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector edges, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> deleteVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector ids, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> deleteTags( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector delTags, - folly::EventBase* evb = nullptr); + bool ifNotExists); + + StorageRpcRespFuture addEdges(const CommonRequestParam& param, + std::vector edges, + std::vector propNames, + bool ifNotExists); + + StorageRpcRespFuture deleteEdges(const CommonRequestParam& param, + std::vector edges); + + StorageRpcRespFuture deleteVertices(const CommonRequestParam& param, + std::vector ids); + + StorageRpcRespFuture deleteTags(const CommonRequestParam& param, + std::vector delTags); folly::Future> updateVertex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, Value vertexId, TagID tagId, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb = nullptr); + std::string condition); folly::Future> updateEdge( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb = nullptr, - bool useExperimentalFeature = false); + std::string condition); folly::Future> getUUID(GraphSpaceID space, const std::string& name, folly::EventBase* evb = nullptr); - folly::SemiFuture> lookupIndex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, + StorageRpcRespFuture lookupIndex( + const CommonRequestParam& param, const std::vector& contexts, bool isEdge, int32_t tagOrEdge, const std::vector& returnCols, - int64_t limit, - folly::EventBase* evb = nullptr); + int64_t limit); - folly::SemiFuture> lookupAndTraverse( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - cpp2::IndexSpec indexSpec, - cpp2::TraverseSpec traverseSpec, - folly::EventBase* evb = nullptr); + StorageRpcRespFuture lookupAndTraverse( + const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec); folly::Future> scanEdge(cpp2::ScanEdgeRequest req, folly::EventBase* evb = nullptr); @@ -178,10 +156,6 @@ class GraphStorageClient : public StorageClientBase> getIdFromDelTags( GraphSpaceID space) const; - - cpp2::RequestCommon makeRequestCommon(SessionID sessionId, - ExecutionPlanID planId, - bool profile = false); }; } // namespace storage diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index ee14c804ddc..8587e69d1b1 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -12,6 +12,8 @@ #include "graph/util/SchemaUtil.h" #include "graph/util/ScopedTimer.h" +using nebula::storage::GraphStorageClient; + namespace nebula { namespace graph { @@ -61,10 +63,12 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { } auto spaceId = spaceInfo.id; time::Duration deleteVertTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->deleteVertices( - spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(vertices)) + ->deleteVertices(param, std::move(vertices)) .via(runner()) .ensure([deleteVertTime]() { VLOG(1) << "Delete vertices time: " << deleteVertTime.elapsedInUSec() << "us"; @@ -115,10 +119,12 @@ folly::Future DeleteTagsExecutor::deleteTags() { auto spaceId = spaceInfo.id; time::Duration deleteTagTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->deleteTags( - spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(delTags)) + ->deleteTags(param, std::move(delTags)) .via(runner()) .ensure([deleteTagTime]() { VLOG(1) << "Delete vertices time: " << deleteTagTime.elapsedInUSec() << "us"; @@ -198,10 +204,12 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { auto spaceId = spaceInfo.id; time::Duration deleteEdgeTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->deleteEdges( - spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(edgeKeys)) + ->deleteEdges(param, std::move(edgeKeys)) .via(runner()) .ensure([deleteEdgeTime]() { VLOG(1) << "Delete edge time: " << deleteEdgeTime.elapsedInUSec() << "us"; diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index a625ca8581a..ef5b3cc25a1 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -11,6 +11,8 @@ #include "graph/service/GraphFlags.h" #include "graph/util/ScopedTimer.h" +using nebula::storage::GraphStorageClient; + namespace nebula { namespace graph { @@ -21,14 +23,12 @@ folly::Future InsertVerticesExecutor::insertVertices() { auto *ivNode = asNode(node()); time::Duration addVertTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->addVertices(ivNode->getSpace(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - ivNode->getVertices(), - ivNode->getPropNames(), - ivNode->getIfNotExists()) + ->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists()) .via(runner()) .ensure([addVertTime]() { VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us"; @@ -47,16 +47,13 @@ folly::Future InsertEdgesExecutor::insertEdges() { auto *ieNode = asNode(node()); time::Duration addEdgeTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); + param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() ->getStorageClient() - ->addEdges(ieNode->getSpace(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - ieNode->getEdges(), - ieNode->getPropNames(), - ieNode->getIfNotExists(), - nullptr, - FLAGS_enable_experimental_feature) + ->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists()) .via(runner()) .ensure( [addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; }) diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index dfad03864a9..141f4b9e709 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -12,6 +12,8 @@ #include "graph/util/SchemaUtil.h" #include "graph/util/ScopedTimer.h" +using nebula::storage::GraphStorageClient; + namespace nebula { namespace graph { @@ -46,11 +48,13 @@ folly::Future UpdateVertexExecutor::execute() { auto *uvNode = asNode(node()); yieldNames_ = uvNode->getYieldNames(); time::Duration updateVertTime; + auto plan = qctx()->plan(); + auto sess = qctx()->rctx()->session(); + GraphStorageClient::CommonRequestParam param( + uvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->updateVertex(uvNode->getSpaceId(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->updateVertex(param, uvNode->getVId(), uvNode->getTagId(), uvNode->getUpdatedProps(), @@ -96,18 +100,18 @@ folly::Future UpdateEdgeExecutor::execute() { yieldNames_ = ueNode->getYieldNames(); time::Duration updateEdgeTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); + param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() ->getStorageClient() - ->updateEdge(ueNode->getSpaceId(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->updateEdge(param, edgeKey, ueNode->getUpdatedProps(), ueNode->getInsertable(), ueNode->getReturnProps(), - ueNode->getCondition(), - nullptr, - FLAGS_enable_experimental_feature) + ueNode->getCondition()) .via(runner()) .ensure([updateEdgeTime]() { VLOG(1) << "Update edge time: " << updateEdgeTime.elapsedInUSec() << "us"; diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index 3bc5ee73c22..23717370b6e 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -66,10 +66,12 @@ folly::Future GetEdgesExecutor::getEdges() { } time::Duration getPropsTime; + GraphStorageClient::CommonRequestParam param(ge->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return DCHECK_NOTNULL(client) - ->getProps(ge->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->getProps(param, std::move(edges), nullptr, ge->props(), diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index 3cabd7101b9..e8eb09ca2c4 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -43,11 +43,12 @@ folly::Future GetNeighborsExecutor::execute() { time::Duration getNbrTime; GraphStorageClient* storageClient = qctx_->getStorageClient(); QueryExpressionContext qec(qctx()->ectx()); + GraphStorageClient::CommonRequestParam param(gn_->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return storageClient - ->getNeighbors(gn_->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled(), + ->getNeighbors(param, std::move(reqDs.colNames), std::move(reqDs.rows), gn_->edgeTypes(), diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index 0f7e5e8c7c9..0a23083cb2c 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -33,10 +33,12 @@ folly::Future GetVerticesExecutor::getVertices() { } time::Duration getPropsTime; + GraphStorageClient::CommonRequestParam param(gv->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return DCHECK_NOTNULL(storageClient) - ->getProps(gv->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->getProps(param, std::move(vertices), gv->props(), nullptr, diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index c8791af6102..aefda16b84b 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -36,11 +36,12 @@ folly::Future IndexScanExecutor::indexScan() { return Status::Error("There is no index to use at runtime"); } + GraphStorageClient::CommonRequestParam param(lookup->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return storageClient - ->lookupIndex(lookup->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled(), + ->lookupIndex(param, ictxs, lookup->isEdge(), lookup->schemaId(), diff --git a/src/graph/service/QueryEngine.cpp b/src/graph/service/QueryEngine.cpp index 018d677344c..6ed6628a6d7 100644 --- a/src/graph/service/QueryEngine.cpp +++ b/src/graph/service/QueryEngine.cpp @@ -57,7 +57,7 @@ void QueryEngine::execute(RequestContextPtr rctx) { Status QueryEngine::setupMemoryMonitorThread() { memoryMonitorThread_ = std::make_unique(); - if (!memoryMonitorThread_ || !memoryMonitorThread_->start("query-engine-bg")) { + if (!memoryMonitorThread_ || !memoryMonitorThread_->start("graph-memory-monitor")) { return Status::Error("Fail to start query engine background thread."); } diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index 1a7438dbb08..a5f83002023 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -170,8 +170,8 @@ class IntegrityTest { void addVertex(std::vector& prev, std::vector& cur, VertexID startId) { std::unordered_map> propNames; propNames[tagId_].emplace_back(propName_); - auto future = - client_->addVertices(spaceId_, 0, 0, genVertices(prev, cur, startId), propNames, true); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + auto future = client_->addVertices(param, genVertices(prev, cur, startId), propNames, true); auto resp = std::move(future).get(); if (!resp.succeeded()) { for (auto& err : resp.failedParts()) { @@ -226,7 +226,8 @@ class IntegrityTest { tagProp.set_tag(tagId_); (*tagProp.props_ref()).emplace_back(propName_); DataSet dataset({kVid}); - auto future = client_->getProps(spaceId_, 0, 0, dataset, &props, nullptr, nullptr); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + auto future = client_->getProps(param, dataset, &props, nullptr, nullptr); auto resp = std::move(future).get(); if (!resp.succeeded()) { LOG(ERROR) << "Failed to fetch props of vertex " << nextId; diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index 4830d899842..6cc3b19cd45 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -298,11 +298,9 @@ class Perf { for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0, false); graphStorageClient_ - ->getNeighbors(spaceId_, - 0, - 0, - false, + ->getNeighbors(param, colNames, vertices, {edgeType_}, @@ -335,7 +333,8 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - graphStorageClient_->addVertices(spaceId_, 0, 0, genVertices(), tagProps_, true) + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->addVertices(param, genVertices(), tagProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -363,7 +362,8 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - graphStorageClient_->addEdges(spaceId_, 0, 0, genEdges(), edgeProps_, true) + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->addEdges(param, genEdges(), edgeProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -395,22 +395,21 @@ class Perf { input.emplace_back(std::move(row)); auto vProps = vertexProps(); auto start = time::WallClock::fastNowInMicroSec(); - auto f = - graphStorageClient_->getProps(spaceId_, 0, 0, std::move(input), &vProps, nullptr, nullptr) - .via(evb) - .thenValue([this, start](auto&& resps) { - if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; - } else { - VLOG(3) << "request successed!"; - } - this->finishedRequests_++; - auto now = time::WallClock::fastNowInMicroSec(); - latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), - now - start); - qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); - }) - .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->getProps(param, std::move(input), &vProps, nullptr, nullptr) + .via(evb) + .thenValue([this, start](auto&& resps) { + if (!resps.succeeded()) { + LOG(ERROR) << "Request failed!"; + } else { + VLOG(3) << "request successed!"; + } + this->finishedRequests_++; + auto now = time::WallClock::fastNowInMicroSec(); + latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), now - start); + qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); + }) + .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); } void getEdgesTask() { @@ -421,22 +420,21 @@ class Perf { input.emplace_back(std::move(row)); auto eProps = edgeProps(); auto start = time::WallClock::fastNowInMicroSec(); - auto f = - graphStorageClient_->getProps(spaceId_, 0, 0, std::move(input), nullptr, &eProps, nullptr) - .via(evb) - .thenValue([this, start](auto&& resps) { - if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; - } else { - VLOG(3) << "request successed!"; - } - this->finishedRequests_++; - auto now = time::WallClock::fastNowInMicroSec(); - latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), - now - start); - qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); - }) - .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->getProps(param, std::move(input), nullptr, &eProps, nullptr) + .via(evb) + .thenValue([this, start](auto&& resps) { + if (!resps.succeeded()) { + LOG(ERROR) << "Request failed!"; + } else { + VLOG(3) << "request successed!"; + } + this->finishedRequests_++; + auto now = time::WallClock::fastNowInMicroSec(); + latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), now - start); + qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); + }) + .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); } private: