Skip to content

Commit

Permalink
Pass graph profile param into storage and cleanup graph storage clien…
Browse files Browse the repository at this point in the history
…t interfaces (#3026)

* Cleanup graph storage client interfaces

* Fix compile

* Format

* Fix shadow compile error
  • Loading branch information
yixinglu authored Oct 18, 2021
1 parent fb3be74 commit 93eed9b
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 300 deletions.
246 changes: 113 additions & 133 deletions src/clients/storage/GraphStorageClient.cpp

Large diffs are not rendered by default.

138 changes: 56 additions & 82 deletions src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
namespace nebula {
namespace storage {

template <typename T>
using StorageRpcRespFuture = folly::SemiFuture<StorageRpcResponse<T>>;

/**
* A wrapper class for GraphStorageServiceAsyncClient thrift API
*
Expand All @@ -24,19 +27,34 @@ namespace storage {
class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncClient> {
FRIEND_TEST(StorageClientTest, LeaderChangeTest);

using Parent = StorageClientBase<cpp2::GraphStorageServiceAsyncClient>;

public:
struct CommonRequestParam {
GraphSpaceID space;
SessionID session;
ExecutionPlanID plan;
bool profile{false};
bool useToss{false};
bool useExperimentalFeature{false};
folly::EventBase* evb{nullptr};

CommonRequestParam(GraphSpaceID space_,
SessionID sess,
ExecutionPlanID plan_,
bool profile_ = false,
bool toss = false,
bool experimental = false,
folly::EventBase* evb_ = nullptr);

cpp2::RequestCommon toReqCommon() const;
};

GraphStorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
meta::MetaClient* metaClient)
: Parent(ioThreadPool, metaClient) {}
: StorageClientBase<cpp2::GraphStorageServiceAsyncClient>(ioThreadPool, metaClient) {}
virtual ~GraphStorageClient() {}

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> getNeighbors(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
bool profile,
StorageRpcRespFuture<cpp2::GetNeighborsResponse> getNeighbors(
const CommonRequestParam& param,
std::vector<std::string> colNames,
// The first column has to be the VertexID
const std::vector<Row>& vertices,
Expand All @@ -50,110 +68,70 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
bool random = false,
const std::vector<cpp2::OrderBy>& orderBy = std::vector<cpp2::OrderBy>(),
int64_t limit = std::numeric_limits<int64_t>::max(),
const Expression* filter = nullptr,
folly::EventBase* evb = nullptr);
const Expression* filter = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> getProps(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
StorageRpcRespFuture<cpp2::GetPropResponse> getProps(
const CommonRequestParam& param,
const DataSet& input,
const std::vector<cpp2::VertexProp>* vertexProps,
const std::vector<cpp2::EdgeProp>* edgeProps,
const std::vector<cpp2::Expr>* expressions,
bool dedup = false,
const std::vector<cpp2::OrderBy>& orderBy = std::vector<cpp2::OrderBy>(),
int64_t limit = std::numeric_limits<int64_t>::max(),
const Expression* filter = nullptr,
folly::EventBase* evb = nullptr);
const Expression* filter = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> addVertices(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
StorageRpcRespFuture<cpp2::ExecResponse> addVertices(
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> addEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
folly::EventBase* evb = nullptr,
bool useToss = false);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<storage::cpp2::EdgeKey> edges,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteVertices(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<Value> ids,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteTags(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::DelTags> delTags,
folly::EventBase* evb = nullptr);
bool ifNotExists);

StorageRpcRespFuture<cpp2::ExecResponse> addEdges(const CommonRequestParam& param,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists);

StorageRpcRespFuture<cpp2::ExecResponse> deleteEdges(const CommonRequestParam& param,
std::vector<storage::cpp2::EdgeKey> edges);

StorageRpcRespFuture<cpp2::ExecResponse> deleteVertices(const CommonRequestParam& param,
std::vector<Value> ids);

StorageRpcRespFuture<cpp2::ExecResponse> deleteTags(const CommonRequestParam& param,
std::vector<cpp2::DelTags> delTags);

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> updateVertex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const CommonRequestParam& param,
Value vertexId,
TagID tagId,
std::vector<cpp2::UpdatedProp> updatedProps,
bool insertable,
std::vector<std::string> returnProps,
std::string condition,
folly::EventBase* evb = nullptr);
std::string condition);

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> updateEdge(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const CommonRequestParam& param,
storage::cpp2::EdgeKey edgeKey,
std::vector<cpp2::UpdatedProp> updatedProps,
bool insertable,
std::vector<std::string> returnProps,
std::string condition,
folly::EventBase* evb = nullptr,
bool useExperimentalFeature = false);
std::string condition);

folly::Future<StatusOr<cpp2::GetUUIDResp>> getUUID(GraphSpaceID space,
const std::string& name,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> lookupIndex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
bool profile,
StorageRpcRespFuture<cpp2::LookupIndexResp> lookupIndex(
const CommonRequestParam& param,
const std::vector<storage::cpp2::IndexQueryContext>& contexts,
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
int64_t limit,
folly::EventBase* evb = nullptr);
int64_t limit);

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> lookupAndTraverse(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
cpp2::IndexSpec indexSpec,
cpp2::TraverseSpec traverseSpec,
folly::EventBase* evb = nullptr);
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

folly::Future<StatusOr<cpp2::ScanEdgeResponse>> scanEdge(cpp2::ScanEdgeRequest req,
folly::EventBase* evb = nullptr);
Expand All @@ -178,10 +156,6 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

StatusOr<std::function<const VertexID&(const cpp2::DelTags&)>> getIdFromDelTags(
GraphSpaceID space) const;

cpp2::RequestCommon makeRequestCommon(SessionID sessionId,
ExecutionPlanID planId,
bool profile = false);
};

} // namespace storage
Expand Down
20 changes: 14 additions & 6 deletions src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "graph/util/SchemaUtil.h"
#include "graph/util/ScopedTimer.h"

using nebula::storage::GraphStorageClient;

namespace nebula {
namespace graph {

Expand Down Expand Up @@ -61,10 +63,12 @@ folly::Future<Status> DeleteVerticesExecutor::deleteVertices() {
}
auto spaceId = spaceInfo.id;
time::Duration deleteVertTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->deleteVertices(
spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(vertices))
->deleteVertices(param, std::move(vertices))
.via(runner())
.ensure([deleteVertTime]() {
VLOG(1) << "Delete vertices time: " << deleteVertTime.elapsedInUSec() << "us";
Expand Down Expand Up @@ -115,10 +119,12 @@ folly::Future<Status> DeleteTagsExecutor::deleteTags() {

auto spaceId = spaceInfo.id;
time::Duration deleteTagTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->deleteTags(
spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(delTags))
->deleteTags(param, std::move(delTags))
.via(runner())
.ensure([deleteTagTime]() {
VLOG(1) << "Delete vertices time: " << deleteTagTime.elapsedInUSec() << "us";
Expand Down Expand Up @@ -198,10 +204,12 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {

auto spaceId = spaceInfo.id;
time::Duration deleteEdgeTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->deleteEdges(
spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(edgeKeys))
->deleteEdges(param, std::move(edgeKeys))
.via(runner())
.ensure([deleteEdgeTime]() {
VLOG(1) << "Delete edge time: " << deleteEdgeTime.elapsedInUSec() << "us";
Expand Down
25 changes: 11 additions & 14 deletions src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "graph/service/GraphFlags.h"
#include "graph/util/ScopedTimer.h"

using nebula::storage::GraphStorageClient;

namespace nebula {
namespace graph {

Expand All @@ -21,14 +23,12 @@ folly::Future<Status> InsertVerticesExecutor::insertVertices() {

auto *ivNode = asNode<InsertVertices>(node());
time::Duration addVertTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->addVertices(ivNode->getSpace(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
ivNode->getVertices(),
ivNode->getPropNames(),
ivNode->getIfNotExists())
->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists())
.via(runner())
.ensure([addVertTime]() {
VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us";
Expand All @@ -47,16 +47,13 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {

auto *ieNode = asNode<InsertEdges>(node());
time::Duration addEdgeTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->addEdges(ieNode->getSpace(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
ieNode->getEdges(),
ieNode->getPropNames(),
ieNode->getIfNotExists(),
nullptr,
FLAGS_enable_experimental_feature)
->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists())
.via(runner())
.ensure(
[addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; })
Expand Down
22 changes: 13 additions & 9 deletions src/graph/executor/mutate/UpdateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "graph/util/SchemaUtil.h"
#include "graph/util/ScopedTimer.h"

using nebula::storage::GraphStorageClient;

namespace nebula {
namespace graph {

Expand Down Expand Up @@ -46,11 +48,13 @@ folly::Future<Status> UpdateVertexExecutor::execute() {
auto *uvNode = asNode<UpdateVertex>(node());
yieldNames_ = uvNode->getYieldNames();
time::Duration updateVertTime;
auto plan = qctx()->plan();
auto sess = qctx()->rctx()->session();
GraphStorageClient::CommonRequestParam param(
uvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->updateVertex(uvNode->getSpaceId(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
->updateVertex(param,
uvNode->getVId(),
uvNode->getTagId(),
uvNode->getUpdatedProps(),
Expand Down Expand Up @@ -96,18 +100,18 @@ folly::Future<Status> UpdateEdgeExecutor::execute() {
yieldNames_ = ueNode->getYieldNames();

time::Duration updateEdgeTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->updateEdge(ueNode->getSpaceId(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
->updateEdge(param,
edgeKey,
ueNode->getUpdatedProps(),
ueNode->getInsertable(),
ueNode->getReturnProps(),
ueNode->getCondition(),
nullptr,
FLAGS_enable_experimental_feature)
ueNode->getCondition())
.via(runner())
.ensure([updateEdgeTime]() {
VLOG(1) << "Update edge time: " << updateEdgeTime.elapsedInUSec() << "us";
Expand Down
8 changes: 5 additions & 3 deletions src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
}

time::Duration getPropsTime;
GraphStorageClient::CommonRequestParam param(ge->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return DCHECK_NOTNULL(client)
->getProps(ge->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
->getProps(param,
std::move(edges),
nullptr,
ge->props(),
Expand Down
9 changes: 5 additions & 4 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
time::Duration getNbrTime;
GraphStorageClient* storageClient = qctx_->getStorageClient();
QueryExpressionContext qec(qctx()->ectx());
GraphStorageClient::CommonRequestParam param(gn_->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return storageClient
->getNeighbors(gn_->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled(),
->getNeighbors(param,
std::move(reqDs.colNames),
std::move(reqDs.rows),
gn_->edgeTypes(),
Expand Down
Loading

0 comments on commit 93eed9b

Please sign in to comment.