Skip to content

Commit

Permalink
Feature/scan multiple parts (#3262)
Browse files Browse the repository at this point in the history
* Scan multiple parts.

* Add multiple parts test case.

* Add limit test.

* Remove unused include.

* Support multiple tags.

* Fix license header.

* Optimize the extra read operations.

* Fix compile error.

* Skip invalid tag in one loop.

* Avoid extra logical.
  • Loading branch information
Shylock-Hg authored Nov 11, 2021
1 parent 1d0c583 commit 8e0028f
Show file tree
Hide file tree
Showing 15 changed files with 1,089 additions and 239 deletions.
80 changes: 56 additions & 24 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,36 +559,68 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> GraphStorageClient::lookupAndTr
});
}

folly::Future<StatusOr<cpp2::ScanEdgeResponse>> GraphStorageClient::scanEdge(
cpp2::ScanEdgeRequest req, folly::EventBase* evb) {
std::pair<HostAddr, cpp2::ScanEdgeRequest> request;
auto host = this->getLeader(req.get_space_id(), req.get_part_id());
if (!host.ok()) {
return folly::makeFuture<StatusOr<cpp2::ScanEdgeResponse>>(host.status());
StorageRpcRespFuture<cpp2::ScanEdgeResponse> GraphStorageClient::scanEdge(
const CommonRequestParam& param,
const cpp2::EdgeProp& edgeProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanEdgeRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanEdgeResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
for (const auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(param.space);
req.set_parts(std::move(c.second));
req.set_return_columns(edgeProp);
req.set_limit(limit);
if (filter != nullptr) {
req.set_filter(filter->encode());
}
req.set_common(param.toReqCommon());
}
request.first = std::move(host).value();
request.second = std::move(req);

return getResponse(evb,
std::move(request),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
return collectResponse(param.evb,
std::move(requests),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
}

folly::Future<StatusOr<cpp2::ScanVertexResponse>> GraphStorageClient::scanVertex(
cpp2::ScanVertexRequest req, folly::EventBase* evb) {
std::pair<HostAddr, cpp2::ScanVertexRequest> request;
auto host = this->getLeader(req.get_space_id(), req.get_part_id());
if (!host.ok()) {
return folly::makeFuture<StatusOr<cpp2::ScanVertexResponse>>(host.status());
StorageRpcRespFuture<cpp2::ScanVertexResponse> GraphStorageClient::scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanVertexRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanVertexResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
for (const auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(param.space);
req.set_parts(std::move(c.second));
req.set_return_columns(vertexProp);
req.set_limit(limit);
if (filter != nullptr) {
req.set_filter(filter->encode());
}
req.set_common(param.toReqCommon());
}
request.first = std::move(host).value();
request.second = std::move(req);

return getResponse(evb,
std::move(request),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::ScanVertexRequest& r) { return client->future_scanVertex(r); });
return collectResponse(
param.evb,
std::move(requests),
[](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::ScanVertexRequest& r) {
return client->future_scanVertex(r);
});
}

StatusOr<std::function<const VertexID&(const Row&)>> GraphStorageClient::getIdFromRow(
Expand Down
13 changes: 9 additions & 4 deletions src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,16 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

folly::Future<StatusOr<cpp2::ScanEdgeResponse>> scanEdge(cpp2::ScanEdgeRequest req,
folly::EventBase* evb = nullptr);
StorageRpcRespFuture<cpp2::ScanEdgeResponse> scanEdge(const CommonRequestParam& param,
const cpp2::EdgeProp& vertexProp,
int64_t limit,
const Expression* filter);

folly::Future<StatusOr<cpp2::ScanVertexResponse>> scanVertex(cpp2::ScanVertexRequest req,
folly::EventBase* evb = nullptr);
StorageRpcRespFuture<cpp2::ScanVertexResponse> scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter);

private:
StatusOr<std::function<const VertexID&(const Row&)>> getIdFromRow(GraphSpaceID space,
Expand Down
23 changes: 23 additions & 0 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,5 +329,28 @@ StorageClientBase<ClientType>::getHostParts(GraphSpaceID spaceId) const {
return hostParts;
}

template <typename ClientType>
StatusOr<std::unordered_map<HostAddr, std::unordered_map<PartitionID, cpp2::ScanCursor>>>
StorageClientBase<ClientType>::getHostPartsWithCursor(GraphSpaceID spaceId) const {
std::unordered_map<HostAddr, std::unordered_map<PartitionID, cpp2::ScanCursor>> hostParts;
auto status = metaClient_->partsNum(spaceId);
if (!status.ok()) {
return Status::Error("Space not found, spaceid: %d", spaceId);
}

// TODO support cursor
cpp2::ScanCursor c;
c.set_has_next(false);
auto parts = status.value();
for (auto partId = 1; partId <= parts; partId++) {
auto leader = getLeader(spaceId, partId);
if (!leader.ok()) {
return leader.status();
}
hostParts[leader.value()].emplace(partId, c);
}
return hostParts;
}

} // namespace storage
} // namespace nebula
11 changes: 3 additions & 8 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class StorageClientBase {
std::unordered_map<PartitionID, std::vector<typename Container::value_type>>>>
clusterIdsToHosts(GraphSpaceID spaceId, const Container& ids, GetIdFunc f) const;

StatusOr<std::unordered_map<HostAddr, std::unordered_map<PartitionID, cpp2::ScanCursor>>>
getHostPartsWithCursor(GraphSpaceID spaceId) const;

virtual StatusOr<meta::PartHosts> getPartHosts(GraphSpaceID spaceId, PartitionID partId) const {
CHECK(metaClient_ != nullptr);
return metaClient_->getPartHostsFromCache(spaceId, partId);
Expand Down Expand Up @@ -208,14 +211,6 @@ class StorageClientBase {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::ScanEdgeRequest& req) const {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::ScanVertexRequest& req) const {
return {req.get_part_id()};
}

bool isValidHostPtr(const HostAddr* addr) {
return addr != nullptr && !addr->host.empty() && addr->port != 0;
}
Expand Down
58 changes: 30 additions & 28 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -560,24 +560,29 @@ struct LookupAndTraverseRequest {
* End of Index section
*/

struct ScanCursor {
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
}

struct ScanVertexRequest {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
// start key of this block
3: optional binary cursor,
4: VertexProp return_columns,
2: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
parts,
3: list<VertexProp> return_columns,
// max row count of tag in this response
5: i64 limit,
4: i64 limit,
// only return data in time range [start_time, end_time)
6: optional i64 start_time,
7: optional i64 end_time,
8: optional binary filter,
5: optional i64 start_time,
6: optional i64 end_time,
7: optional binary filter,
// when storage enable multi versions and only_latest_version is true, only return latest version.
// when storage disable multi versions, just use the default value.
9: bool only_latest_version = false,
8: bool only_latest_version = false,
// if set to false, forbid follower read
10: bool enable_read_from_follower = true,
11: optional RequestCommon common,
9: bool enable_read_from_follower = true,
10: optional RequestCommon common,
}

struct ScanVertexResponse {
Expand All @@ -586,29 +591,27 @@ struct ScanVertexResponse {
// Each column represents one property. the column name is in the form of "tag_name.prop_alias"
// in the same order which specified in VertexProp in request.
2: common.DataSet vertex_data,
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
3: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
cursors;
}

struct ScanEdgeRequest {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
// start key of this block
3: optional binary cursor,
4: EdgeProp return_columns,
2: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
parts,
3: EdgeProp return_columns,
// max row count of edge in this response
5: i64 limit,
4: i64 limit,
// only return data in time range [start_time, end_time)
6: optional i64 start_time,
7: optional i64 end_time,
8: optional binary filter,
5: optional i64 start_time,
6: optional i64 end_time,
7: optional binary filter,
// when storage enable multi versions and only_latest_version is true, only return latest version.
// when storage disable multi versions, just use the default value.
9: bool only_latest_version = false,
8: bool only_latest_version = false,
// if set to false, forbid follower read
10: bool enable_read_from_follower = true,
11: optional RequestCommon common,
9: bool enable_read_from_follower = true,
10: optional RequestCommon common,
}

struct ScanEdgeResponse {
Expand All @@ -617,9 +620,8 @@ struct ScanEdgeResponse {
// Each column represents one property. the column name is in the form of "edge_name.prop_alias"
// in the same order which specified in EdgeProp in requesss.
2: common.DataSet edge_data,
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
3: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
cursors;
}

struct TaskPara {
Expand Down
21 changes: 18 additions & 3 deletions src/storage/exec/EdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class EdgeNode : public IterateNode<T> {
return valueHandler(this->key(), this->reader(), props_);
}

const std::string& getEdgeName() { return edgeName_; }
const std::string& getEdgeName() const { return edgeName_; }

EdgeType edgeType() const { return edgeType_; }

protected:
EdgeNode(RuntimeContext* context,
Expand Down Expand Up @@ -113,15 +115,28 @@ class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {
(*edgeKey.dst_ref()).getStr());
ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &val_);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
resetReader();
return nebula::cpp2::ErrorCode::SUCCEEDED;
return doExecute(key_, val_);
} else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
// regard key not found as succeed as well, upper node will handle it
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return ret;
}

nebula::cpp2::ErrorCode doExecute(const std::string& key, const std::string& value) {
key_ = key;
val_ = value;
resetReader();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void clear() {
valid_ = false;
key_.clear();
val_.clear();
reader_.reset();
}

private:
void resetReader() {
reader_.reset(*schemas_, val_);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/exec/RelNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class RelNode {

explicit RelNode(const std::string& name) : name_(name) {}

const std::string& name() const { return name_; }

std::string name_ = "RelNode";
std::vector<RelNode<T>*> dependencies_;
bool hasDependents_ = false;
Expand Down
Loading

0 comments on commit 8e0028f

Please sign in to comment.