Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass graph profile param into storage and cleanup graph storage client interfaces #3026

Merged
merged 7 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 113 additions & 133 deletions src/clients/storage/GraphStorageClient.cpp

Large diffs are not rendered by default.

138 changes: 56 additions & 82 deletions src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
namespace nebula {
namespace storage {

template <typename T>
using StorageRpcRespFuture = folly::SemiFuture<StorageRpcResponse<T>>;

/**
* A wrapper class for GraphStorageServiceAsyncClient thrift API
*
Expand All @@ -24,19 +27,34 @@ namespace storage {
class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncClient> {
FRIEND_TEST(StorageClientTest, LeaderChangeTest);

using Parent = StorageClientBase<cpp2::GraphStorageServiceAsyncClient>;

public:
struct CommonRequestParam {
GraphSpaceID space;
SessionID session;
ExecutionPlanID plan;
bool profile{false};
bool useToss{false};
bool useExperimentalFeature{false};
folly::EventBase* evb{nullptr};

CommonRequestParam(GraphSpaceID space_,
SessionID sess,
ExecutionPlanID plan_,
bool profile_ = false,
bool toss = false,
bool experimental = false,
folly::EventBase* evb_ = nullptr);

cpp2::RequestCommon toReqCommon() const;
};

GraphStorageClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
meta::MetaClient* metaClient)
: Parent(ioThreadPool, metaClient) {}
: StorageClientBase<cpp2::GraphStorageServiceAsyncClient>(ioThreadPool, metaClient) {}
virtual ~GraphStorageClient() {}

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> getNeighbors(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
bool profile,
StorageRpcRespFuture<cpp2::GetNeighborsResponse> getNeighbors(
const CommonRequestParam& param,
std::vector<std::string> colNames,
// The first column has to be the VertexID
const std::vector<Row>& vertices,
Expand All @@ -50,110 +68,70 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
bool random = false,
const std::vector<cpp2::OrderBy>& orderBy = std::vector<cpp2::OrderBy>(),
int64_t limit = std::numeric_limits<int64_t>::max(),
const Expression* filter = nullptr,
folly::EventBase* evb = nullptr);
const Expression* filter = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> getProps(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
StorageRpcRespFuture<cpp2::GetPropResponse> getProps(
const CommonRequestParam& param,
const DataSet& input,
const std::vector<cpp2::VertexProp>* vertexProps,
const std::vector<cpp2::EdgeProp>* edgeProps,
const std::vector<cpp2::Expr>* expressions,
bool dedup = false,
const std::vector<cpp2::OrderBy>& orderBy = std::vector<cpp2::OrderBy>(),
int64_t limit = std::numeric_limits<int64_t>::max(),
const Expression* filter = nullptr,
folly::EventBase* evb = nullptr);
const Expression* filter = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> addVertices(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
StorageRpcRespFuture<cpp2::ExecResponse> addVertices(
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> addEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
folly::EventBase* evb = nullptr,
bool useToss = false);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<storage::cpp2::EdgeKey> edges,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteVertices(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<Value> ids,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteTags(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::DelTags> delTags,
folly::EventBase* evb = nullptr);
bool ifNotExists);

StorageRpcRespFuture<cpp2::ExecResponse> addEdges(const CommonRequestParam& param,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists);

StorageRpcRespFuture<cpp2::ExecResponse> deleteEdges(const CommonRequestParam& param,
std::vector<storage::cpp2::EdgeKey> edges);

StorageRpcRespFuture<cpp2::ExecResponse> deleteVertices(const CommonRequestParam& param,
std::vector<Value> ids);

StorageRpcRespFuture<cpp2::ExecResponse> deleteTags(const CommonRequestParam& param,
std::vector<cpp2::DelTags> delTags);

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> updateVertex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const CommonRequestParam& param,
Value vertexId,
TagID tagId,
std::vector<cpp2::UpdatedProp> updatedProps,
bool insertable,
std::vector<std::string> returnProps,
std::string condition,
folly::EventBase* evb = nullptr);
std::string condition);

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> updateEdge(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const CommonRequestParam& param,
storage::cpp2::EdgeKey edgeKey,
std::vector<cpp2::UpdatedProp> updatedProps,
bool insertable,
std::vector<std::string> returnProps,
std::string condition,
folly::EventBase* evb = nullptr,
bool useExperimentalFeature = false);
std::string condition);

folly::Future<StatusOr<cpp2::GetUUIDResp>> getUUID(GraphSpaceID space,
const std::string& name,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> lookupIndex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
bool profile,
StorageRpcRespFuture<cpp2::LookupIndexResp> lookupIndex(
const CommonRequestParam& param,
const std::vector<storage::cpp2::IndexQueryContext>& contexts,
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
int64_t limit,
folly::EventBase* evb = nullptr);
int64_t limit);

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> lookupAndTraverse(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
cpp2::IndexSpec indexSpec,
cpp2::TraverseSpec traverseSpec,
folly::EventBase* evb = nullptr);
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

folly::Future<StatusOr<cpp2::ScanEdgeResponse>> scanEdge(cpp2::ScanEdgeRequest req,
folly::EventBase* evb = nullptr);
Expand All @@ -178,10 +156,6 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

StatusOr<std::function<const VertexID&(const cpp2::DelTags&)>> getIdFromDelTags(
GraphSpaceID space) const;

cpp2::RequestCommon makeRequestCommon(SessionID sessionId,
ExecutionPlanID planId,
bool profile = false);
};

} // namespace storage
Expand Down
20 changes: 14 additions & 6 deletions src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "graph/util/SchemaUtil.h"
#include "graph/util/ScopedTimer.h"

using nebula::storage::GraphStorageClient;

namespace nebula {
namespace graph {

Expand Down Expand Up @@ -61,10 +63,12 @@ folly::Future<Status> DeleteVerticesExecutor::deleteVertices() {
}
auto spaceId = spaceInfo.id;
time::Duration deleteVertTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->deleteVertices(
spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(vertices))
->deleteVertices(param, std::move(vertices))
.via(runner())
.ensure([deleteVertTime]() {
VLOG(1) << "Delete vertices time: " << deleteVertTime.elapsedInUSec() << "us";
Expand Down Expand Up @@ -115,10 +119,12 @@ folly::Future<Status> DeleteTagsExecutor::deleteTags() {

auto spaceId = spaceInfo.id;
time::Duration deleteTagTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->deleteTags(
spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(delTags))
->deleteTags(param, std::move(delTags))
.via(runner())
.ensure([deleteTagTime]() {
VLOG(1) << "Delete vertices time: " << deleteTagTime.elapsedInUSec() << "us";
Expand Down Expand Up @@ -198,10 +204,12 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {

auto spaceId = spaceInfo.id;
time::Duration deleteEdgeTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->deleteEdges(
spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(edgeKeys))
->deleteEdges(param, std::move(edgeKeys))
.via(runner())
.ensure([deleteEdgeTime]() {
VLOG(1) << "Delete edge time: " << deleteEdgeTime.elapsedInUSec() << "us";
Expand Down
25 changes: 11 additions & 14 deletions src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "graph/service/GraphFlags.h"
#include "graph/util/ScopedTimer.h"

using nebula::storage::GraphStorageClient;

namespace nebula {
namespace graph {

Expand All @@ -21,14 +23,12 @@ folly::Future<Status> InsertVerticesExecutor::insertVertices() {

auto *ivNode = asNode<InsertVertices>(node());
time::Duration addVertTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->addVertices(ivNode->getSpace(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
ivNode->getVertices(),
ivNode->getPropNames(),
ivNode->getIfNotExists())
->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists())
.via(runner())
.ensure([addVertTime]() {
VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us";
Expand All @@ -47,16 +47,13 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {

auto *ieNode = asNode<InsertEdges>(node());
time::Duration addEdgeTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->addEdges(ieNode->getSpace(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
ieNode->getEdges(),
ieNode->getPropNames(),
ieNode->getIfNotExists(),
nullptr,
FLAGS_enable_experimental_feature)
->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists())
.via(runner())
.ensure(
[addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; })
Expand Down
22 changes: 13 additions & 9 deletions src/graph/executor/mutate/UpdateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "graph/util/SchemaUtil.h"
#include "graph/util/ScopedTimer.h"

using nebula::storage::GraphStorageClient;

namespace nebula {
namespace graph {

Expand Down Expand Up @@ -46,11 +48,13 @@ folly::Future<Status> UpdateVertexExecutor::execute() {
auto *uvNode = asNode<UpdateVertex>(node());
yieldNames_ = uvNode->getYieldNames();
time::Duration updateVertTime;
auto plan = qctx()->plan();
auto sess = qctx()->rctx()->session();
GraphStorageClient::CommonRequestParam param(
uvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->updateVertex(uvNode->getSpaceId(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
->updateVertex(param,
uvNode->getVId(),
uvNode->getTagId(),
uvNode->getUpdatedProps(),
Expand Down Expand Up @@ -96,18 +100,18 @@ folly::Future<Status> UpdateEdgeExecutor::execute() {
yieldNames_ = ueNode->getYieldNames();

time::Duration updateEdgeTime;
auto plan = qctx()->plan();
GraphStorageClient::CommonRequestParam param(
ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->updateEdge(ueNode->getSpaceId(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
->updateEdge(param,
edgeKey,
ueNode->getUpdatedProps(),
ueNode->getInsertable(),
ueNode->getReturnProps(),
ueNode->getCondition(),
nullptr,
FLAGS_enable_experimental_feature)
ueNode->getCondition())
.via(runner())
.ensure([updateEdgeTime]() {
VLOG(1) << "Update edge time: " << updateEdgeTime.elapsedInUSec() << "us";
Expand Down
8 changes: 5 additions & 3 deletions src/graph/executor/query/GetEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ folly::Future<Status> GetEdgesExecutor::getEdges() {
}

time::Duration getPropsTime;
GraphStorageClient::CommonRequestParam param(ge->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return DCHECK_NOTNULL(client)
->getProps(ge->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
->getProps(param,
std::move(edges),
nullptr,
ge->props(),
Expand Down
9 changes: 5 additions & 4 deletions src/graph/executor/query/GetNeighborsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ folly::Future<Status> GetNeighborsExecutor::execute() {
time::Duration getNbrTime;
GraphStorageClient* storageClient = qctx_->getStorageClient();
QueryExpressionContext qec(qctx()->ectx());
GraphStorageClient::CommonRequestParam param(gn_->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled());
return storageClient
->getNeighbors(gn_->space(),
qctx()->rctx()->session()->id(),
qctx()->plan()->id(),
qctx()->plan()->isProfileEnabled(),
->getNeighbors(param,
std::move(reqDs.colNames),
std::move(reqDs.rows),
gn_->edgeTypes(),
Expand Down
Loading