Skip to content

Commit

Permalink
graphd passes the sessionid and planid parameters to storaged
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs committed Aug 26, 2021
1 parent eb8b3d6 commit 04a15d7
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 47 deletions.
63 changes: 58 additions & 5 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace storage {

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> GraphStorageClient::getNeighbors(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<std::string> colNames,
const std::vector<Row>& vertices,
const std::vector<EdgeType>& edgeTypes,
Expand Down Expand Up @@ -40,14 +42,15 @@ folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> GraphStorageCl
}

auto& clusters = status.value();
auto common = makeRequestCommon(session, plan);
std::unordered_map<HostAddr, cpp2::GetNeighborsRequest> requests;
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_column_names(colNames);
req.set_parts(std::move(c.second));

req.set_common(common);
cpp2::TraverseSpec spec;
spec.set_edge_types(edgeTypes);
spec.set_edge_direction(edgeDirection);
Expand Down Expand Up @@ -85,6 +88,8 @@ folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> GraphStorageCl

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::addVertices(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists,
Expand All @@ -103,13 +108,15 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::ad

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::AddVerticesRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_if_not_exists(ifNotExists);
req.set_parts(std::move(c.second));
req.set_prop_names(propNames);
req.set_common(common);
}

VLOG(3) << "requests size " << requests.size();
Expand All @@ -123,6 +130,8 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::ad

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::addEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
Expand All @@ -142,13 +151,15 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::ad

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::AddEdgesRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_if_not_exists(ifNotExists);
req.set_parts(std::move(c.second));
req.set_prop_names(propNames);
req.set_common(common);
}
return collectResponse(
evb,
Expand All @@ -160,6 +171,8 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::ad

folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> GraphStorageClient::getProps(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const DataSet& input,
const std::vector<cpp2::VertexProp>* vertexProps,
const std::vector<cpp2::EdgeProp>* edgeProps,
Expand All @@ -183,6 +196,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> GraphStorageClient:

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::GetPropRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
Expand All @@ -205,6 +219,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> GraphStorageClient:
if (filter.size() > 0) {
req.set_filter(filter);
}
req.set_common(common);
}

return collectResponse(evb,
Expand All @@ -214,7 +229,11 @@ folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> GraphStorageClient:
}

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::deleteEdges(
GraphSpaceID space, std::vector<cpp2::EdgeKey> edges, folly::EventBase* evb) {
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::EdgeKey> edges,
folly::EventBase* evb) {
auto cbStatus = getIdFromEdgeKey(space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -229,11 +248,13 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::de

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::DeleteEdgesRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_parts(std::move(c.second));
req.set_common(common);
}

return collectResponse(
Expand All @@ -245,7 +266,11 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::de
}

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::deleteVertices(
GraphSpaceID space, std::vector<Value> ids, folly::EventBase* evb) {
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<Value> ids,
folly::EventBase* evb) {
auto cbStatus = getIdFromValue(space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -260,11 +285,13 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::de

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::DeleteVerticesRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_parts(std::move(c.second));
req.set_common(common);
}

return collectResponse(
Expand All @@ -276,7 +303,11 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::de
}

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::deleteTags(
GraphSpaceID space, std::vector<cpp2::DelTags> delTags, folly::EventBase* evb) {
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::DelTags> delTags,
folly::EventBase* evb) {
auto cbStatus = getIdFromDelTags(space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -291,11 +322,13 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::de

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::DeleteTagsRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_parts(std::move(c.second));
req.set_common(common);
}

return collectResponse(
Expand All @@ -308,6 +341,8 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::de

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updateVertex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
Value vertexId,
TagID tagId,
std::vector<cpp2::UpdatedProp> updatedProps,
Expand Down Expand Up @@ -347,6 +382,7 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updat
req.set_updated_props(std::move(updatedProps));
req.set_return_props(std::move(returnProps));
req.set_insertable(insertable);
req.set_common(makeRequestCommon(session, plan));
if (condition.size() > 0) {
req.set_condition(std::move(condition));
}
Expand All @@ -362,6 +398,8 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updat

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updateEdge(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
storage::cpp2::EdgeKey edgeKey,
std::vector<cpp2::UpdatedProp> updatedProps,
bool insertable,
Expand Down Expand Up @@ -399,6 +437,7 @@ folly::Future<StatusOr<storage::cpp2::UpdateResponse>> GraphStorageClient::updat
req.set_updated_props(std::move(updatedProps));
req.set_return_props(std::move(returnProps));
req.set_insertable(insertable);
req.set_common(makeRequestCommon(session, plan));
if (condition.size() > 0) {
req.set_condition(std::move(condition));
}
Expand Down Expand Up @@ -446,6 +485,8 @@ folly::Future<StatusOr<cpp2::GetUUIDResp>> GraphStorageClient::getUUID(GraphSpac

folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient::lookupIndex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const std::vector<storage::cpp2::IndexQueryContext>& contexts,
bool isEdge,
int32_t tagOrEdge,
Expand All @@ -459,6 +500,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::LookupIndexRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
Expand All @@ -470,8 +512,8 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:
spec.set_contexts(contexts);
spec.set_is_edge(isEdge);
spec.set_tag_or_edge_id(tagOrEdge);

req.set_indices(spec);
req.set_common(common);
}

return collectResponse(
Expand All @@ -484,6 +526,8 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>>
GraphStorageClient::lookupAndTraverse(GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
cpp2::IndexSpec indexSpec,
cpp2::TraverseSpec traverseSpec,
folly::EventBase* evb) {
Expand All @@ -495,13 +539,15 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space,

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::LookupAndTraverseRequest> requests;
auto common = makeRequestCommon(session, plan);
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(space);
req.set_parts(std::move(c.second));
req.set_indices(indexSpec);
req.set_traverse_spec(traverseSpec);
req.set_common(common);
}

return collectResponse(
Expand Down Expand Up @@ -740,6 +786,13 @@ StatusOr<std::function<const VertexID&(const cpp2::DelTags&)>> GraphStorageClien
}
return cb;
}
cpp2::RequestCommon GraphStorageClient::makeRequestCommon(SessionID sessionId,
ExecutionPlanID planId) {
cpp2::RequestCommon common;
common.set_session_id(sessionId);
common.set_plan_id(planId);
return common;
}

} // namespace storage
} // namespace nebula
32 changes: 30 additions & 2 deletions src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> getNeighbors(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<std::string> colNames,
// The first column has to be the VertexID
const std::vector<Row>& vertices,
Expand All @@ -52,6 +54,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::SemiFuture<StorageRpcResponse<cpp2::GetPropResponse>> getProps(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const DataSet& input,
const std::vector<cpp2::VertexProp>* vertexProps,
const std::vector<cpp2::EdgeProp>* edgeProps,
Expand All @@ -64,13 +68,17 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> addVertices(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> addEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
Expand All @@ -79,17 +87,29 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteEdges(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<storage::cpp2::EdgeKey> edges,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteVertices(
GraphSpaceID space, std::vector<Value> ids, folly::EventBase* evb = nullptr);
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<Value> ids,
folly::EventBase* evb = nullptr);

folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> deleteTags(
GraphSpaceID space, std::vector<cpp2::DelTags> delTags, folly::EventBase* evb = nullptr);
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
std::vector<cpp2::DelTags> delTags,
folly::EventBase* evb = nullptr);

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> updateVertex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
Value vertexId,
TagID tagId,
std::vector<cpp2::UpdatedProp> updatedProps,
Expand All @@ -100,6 +120,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::Future<StatusOr<storage::cpp2::UpdateResponse>> updateEdge(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
storage::cpp2::EdgeKey edgeKey,
std::vector<cpp2::UpdatedProp> updatedProps,
bool insertable,
Expand All @@ -113,6 +135,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> lookupIndex(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
const std::vector<storage::cpp2::IndexQueryContext>& contexts,
bool isEdge,
int32_t tagOrEdge,
Expand All @@ -121,6 +145,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

folly::SemiFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>> lookupAndTraverse(
GraphSpaceID space,
SessionID session,
ExecutionPlanID plan,
cpp2::IndexSpec indexSpec,
cpp2::TraverseSpec traverseSpec,
folly::EventBase* evb = nullptr);
Expand Down Expand Up @@ -148,6 +174,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy

StatusOr<std::function<const VertexID&(const cpp2::DelTags&)>> getIdFromDelTags(
GraphSpaceID space) const;

cpp2::RequestCommon makeRequestCommon(SessionID sessionId, ExecutionPlanID planId);
};

} // namespace storage
Expand Down
Loading

0 comments on commit 04a15d7

Please sign in to comment.