Skip to content

Commit

Permalink
Change input vid format (#4583)
Browse files Browse the repository at this point in the history
* support getProp only return vid & delete _src from edge

* fix prune properties

* enhance edge"

* fix pathBuildExpr

* fix union error

* fix unwind

* fix bicartesionproduct

* add optimize appendvertice flag

* fix test

* fix format

* change getNeighbor's input

* fix error

* fix test error and fix go yield id(3276157)

* fix error

* fix ctest error
  • Loading branch information
nevermore3 authored Aug 30, 2022
1 parent 474554f commit db8b909
Show file tree
Hide file tree
Showing 25 changed files with 206 additions and 261 deletions.
13 changes: 7 additions & 6 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ cpp2::RequestCommon StorageClient::CommonRequestParam::toReqCommon() const {
StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
const CommonRequestParam& param,
std::vector<std::string> colNames,
const std::vector<Row>& vertices,
const std::vector<Value>& vids,
const std::vector<EdgeType>& edgeTypes,
cpp2::EdgeDirection edgeDirection,
const std::vector<cpp2::StatProp>* statProps,
Expand All @@ -52,13 +52,12 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
const std::vector<cpp2::OrderBy>& orderBy,
int64_t limit,
const Expression* filter) {
auto cbStatus = getIdFromRow(param.space, false);
auto cbStatus = getIdFromValue(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>>(
std::runtime_error(cbStatus.status().toString()));
}

auto status = clusterIdsToHosts(param.space, vertices, std::move(cbStatus).value());
auto status = clusterIdsToHosts(param.space, vids, std::move(cbStatus).value());
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::GetNeighborsResponse>>(
std::runtime_error(status.status().toString()));
Expand Down Expand Up @@ -109,14 +108,16 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> StorageClient::getNeighbors(
}

StorageRpcRespFuture<cpp2::GetDstBySrcResponse> StorageClient::getDstBySrc(
const CommonRequestParam& param, const List& vertices, const std::vector<EdgeType>& edgeTypes) {
const CommonRequestParam& param,
const std::vector<Value>& vertices,
const std::vector<EdgeType>& edgeTypes) {
auto cbStatus = getIdFromValue(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::GetDstBySrcResponse>>(
std::runtime_error(cbStatus.status().toString()));
}

auto status = clusterIdsToHosts(param.space, vertices.values, std::move(cbStatus).value());
auto status = clusterIdsToHosts(param.space, vertices, std::move(cbStatus).value());
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::GetDstBySrcResponse>>(
std::runtime_error(status.status().toString()));
Expand Down
4 changes: 2 additions & 2 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class StorageClient
const CommonRequestParam& param,
std::vector<std::string> colNames,
// The first column has to be the VertexID
const std::vector<Row>& vertices,
const std::vector<Value>& vids,
const std::vector<EdgeType>& edgeTypes,
cpp2::EdgeDirection edgeDirection,
const std::vector<cpp2::StatProp>* statProps,
Expand All @@ -82,7 +82,7 @@ class StorageClient

StorageRpcRespFuture<cpp2::GetDstBySrcResponse> getDstBySrc(
const CommonRequestParam& param,
const List& vertices,
const std::vector<Value>& vertices,
const std::vector<EdgeType>& edgeTypes);

StorageRpcRespFuture<cpp2::GetPropResponse> getProps(
Expand Down
4 changes: 2 additions & 2 deletions src/common/expression/CaseExpression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ bool CaseExpression::operator==(const Expression& rhs) const {
}

const Value& CaseExpression::eval(ExpressionContext& ctx) {
auto cond = condition_ != nullptr ? condition_->eval(ctx) : Value();
const auto& cond = condition_ != nullptr ? condition_->eval(ctx) : Value();
for (const auto& whenThen : cases_) {
auto when = whenThen.when->eval(ctx);
const auto& when = whenThen.when->eval(ctx);
if (condition_ != nullptr) {
if (cond == when) {
result_ = whenThen.then->eval(ctx);
Expand Down
5 changes: 1 addition & 4 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,8 @@ Value GetNeighborsIter::getVertex(const std::string& name) const {
if (!valid()) {
return Value::kNullValue;
}

auto vidVal = getColumn(0);
if (UNLIKELY(!SchemaUtil::isValidVid(vidVal))) {
return Value::kNullBadType;
}

Vertex vertex;
vertex.vid = vidVal;
auto& tagPropMap = currentDs_->tagPropsMap;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/context/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Result final {
return std::move(core_.iter);
}

Iterator* iterRef() {
Iterator* iterRef() const {
return core_.iter.get();
}

Expand Down
47 changes: 26 additions & 21 deletions src/graph/executor/StorageAccessExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,39 +77,43 @@ StatusOr<DataSet> buildRequestDataSet(const SpaceInfo &space,
}

template <typename VidType>
StatusOr<List> buildRequestList(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup) {
StatusOr<std::vector<Value>> buildRequestList(const SpaceInfo &space,
QueryExpressionContext &exprCtx,
Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher) {
DCHECK(iter && expr) << "iter=" << iter << ", expr=" << expr;
nebula::List vertices;
auto s = iter->size();
vertices.reserve(s);
std::vector<Value> vids;
auto iterSize = iter->size();
vids.reserve(iterSize);

std::unordered_set<VidType> uniqueSet;
uniqueSet.reserve(s);
uniqueSet.reserve(iterSize);

const auto &vidType = *(space.spaceDesc.vid_type_ref());
const auto &metaVidType = *(space.spaceDesc.vid_type_ref());
auto vidType = SchemaUtil::propTypeToValueType(metaVidType.get_type());

for (; iter->valid(); iter->next()) {
auto vid = expr->eval(exprCtx(iter));
if (vid.empty()) {
continue;
}
if (!SchemaUtil::isValidVid(vid, vidType)) {
if (vid.type() != vidType) {
if (isCypher) {
continue;
}
std::stringstream ss;
ss << "`" << vid.toString() << "', the srcs should be type of "
<< apache::thrift::util::enumNameSafe(vidType.get_type()) << ", but was`" << vid.type()
<< "'";
ss << "`" << vid.toString() << "', the srcs should be type of " << vidType << ", but was`"
<< vid.type() << "'";
return Status::Error(ss.str());
}
if (dedup && !uniqueSet.emplace(Vid<VidType>::value(vid)).second) {
continue;
}
vertices.emplace_back(std::move(vid));
vids.emplace_back(std::move(vid));
}
return vertices;
return vids;
}

} // namespace internal
Expand All @@ -131,16 +135,17 @@ StatusOr<DataSet> StorageAccessExecutor::buildRequestDataSetByVidType(Iterator *
return internal::buildRequestDataSet<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

StatusOr<List> StorageAccessExecutor::buildRequestListByVidType(Iterator *iter,
Expression *expr,
bool dedup) {
StatusOr<std::vector<Value>> StorageAccessExecutor::buildRequestListByVidType(Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher) {
const auto &space = qctx()->rctx()->session()->space();
QueryExpressionContext exprCtx(qctx()->ectx());

if (isIntVidType(space)) {
return internal::buildRequestList<int64_t>(space, exprCtx, iter, expr, dedup);
return internal::buildRequestList<int64_t>(space, exprCtx, iter, expr, dedup, isCypher);
}
return internal::buildRequestList<std::string>(space, exprCtx, iter, expr, dedup);
return internal::buildRequestList<std::string>(space, exprCtx, iter, expr, dedup, isCypher);
}

std::string StorageAccessExecutor::getStorageDetail(
Expand Down
5 changes: 4 additions & 1 deletion src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ class StorageAccessExecutor : public Executor {
bool dedup,
bool isCypher = false);

StatusOr<List> buildRequestListByVidType(Iterator *iter, Expression *expr, bool dedup);
StatusOr<std::vector<Value>> buildRequestListByVidType(Iterator *iter,
Expression *expr,
bool dedup,
bool isCypher = false);
};

} // namespace graph
Expand Down
33 changes: 14 additions & 19 deletions src/graph/executor/algo/BatchShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ size_t BatchShortestPath::init(const HashSet& startVids, const HashSet& endVids)
resultDs_.resize(rowSize);

for (auto& _startVids : batchStartVids_) {
DataSet startDs;
HashSet srcVids;
PathMap leftPathMap;
for (auto& startVid : _startVids) {
startDs.rows.emplace_back(Row({startVid}));
srcVids.emplace(startVid);
std::vector<CustomPath> dummy;
leftPathMap[startVid].emplace(startVid, std::move(dummy));
}
for (auto& _endVids : batchEndVids_) {
DataSet endDs;
HashSet dstVids;
PathMap preRightPathMap;
PathMap rightPathMap;
for (auto& endVid : _endVids) {
endDs.rows.emplace_back(Row({endVid}));
dstVids.emplace(endVid);
std::vector<CustomPath> dummy;
rightPathMap[endVid].emplace(endVid, dummy);
preRightPathMap[endVid].emplace(endVid, std::move(dummy));
Expand All @@ -75,8 +75,8 @@ size_t BatchShortestPath::init(const HashSet& startVids, const HashSet& endVids)
currentRightPathMaps_.emplace_back(std::move(rightPathMap));

// set vid for getNeightbor
leftVids_.emplace_back(startDs);
rightVids_.emplace_back(std::move(endDs));
leftVids_.emplace_back(srcVids);
rightVids_.emplace_back(dstVids);

// set terminateMap
std::unordered_multimap<StartVid, std::pair<EndVid, bool>> terminationMap;
Expand Down Expand Up @@ -116,11 +116,13 @@ folly::Future<Status> BatchShortestPath::getNeighbors(size_t rowNum, size_t step
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());
auto& inputRows = reverse ? rightVids_[rowNum].rows : leftVids_[rowNum].rows;
auto& inputVids = reverse ? rightVids_[rowNum] : leftVids_[rowNum];
std::vector<Value> vids(inputVids.begin(), inputVids.end());
inputVids.clear();
return storageClient
->getNeighbors(param,
{nebula::kVid},
std::move(inputRows),
std::move(vids),
{},
pathNode_->edgeDirection(),
nullptr,
Expand Down Expand Up @@ -271,8 +273,8 @@ folly::Future<Status> BatchShortestPath::handleResponse(size_t rowNum, size_t st
if (result || stepNum * 2 >= maxStep_) {
return folly::makeFuture<Status>(Status::OK());
}
auto& leftVids = leftVids_[rowNum].rows;
auto& rightVids = rightVids_[rowNum].rows;
auto& leftVids = leftVids_[rowNum];
auto& rightVids = rightVids_[rowNum];
if (leftVids.empty() || rightVids.empty()) {
return folly::makeFuture<Status>(Status::OK());
}
Expand Down Expand Up @@ -458,17 +460,10 @@ std::vector<Row> BatchShortestPath::createPaths(const std::vector<CustomPath>& p
}

void BatchShortestPath::setNextStepVid(const PathMap& paths, size_t rowNum, bool reverse) {
std::vector<Row> nextStepVids;
auto& nextStepVids = reverse ? rightVids_[rowNum] : leftVids_[rowNum];
nextStepVids.reserve(paths.size());
for (const auto& path : paths) {
Row row;
row.values.emplace_back(path.first);
nextStepVids.emplace_back(std::move(row));
}
if (reverse) {
rightVids_[rowNum].rows.swap(nextStepVids);
} else {
leftVids_[rowNum].rows.swap(nextStepVids);
nextStepVids.emplace(path.first);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/algo/ShortestPathBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class ShortestPathBase {
folly::SpinLock statsLock_;

std::vector<DataSet> resultDs_;
std::vector<DataSet> leftVids_;
std::vector<DataSet> rightVids_;
std::vector<HashSet> leftVids_;
std::vector<HashSet> rightVids_;
};

} // namespace graph
Expand Down
8 changes: 4 additions & 4 deletions src/graph/executor/algo/ShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ folly::Future<Status> ShortestPathExecutor::execute() {

size_t ShortestPathExecutor::checkInput(HashSet& startVids, HashSet& endVids) {
auto iter = ectx_->getResult(pathNode_->inputVar()).iter();
const auto& vidType = *(qctx()->rctx()->session()->space().spaceDesc.vid_type_ref());
const auto& metaVidType = *(qctx()->rctx()->session()->space().spaceDesc.vid_type_ref());
auto vidType = SchemaUtil::propTypeToValueType(metaVidType.get_type());
for (; iter->valid(); iter->next()) {
auto start = iter->getColumn(0);
auto end = iter->getColumn(1);
if (!SchemaUtil::isValidVid(start, vidType) || !SchemaUtil::isValidVid(end, vidType)) {
if (start.type() != vidType || end.type() != vidType) {
LOG(ERROR) << "Mismatched shortestPath vid type. start type : " << start.type()
<< ", end type: " << end.type()
<< ", space vid type: " << SchemaUtil::typeToString(vidType);
<< ", end type: " << end.type() << ", space vid type: " << vidType;
continue;
}
if (start == end) {
Expand Down
37 changes: 14 additions & 23 deletions src/graph/executor/algo/SingleShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,17 @@ void SingleShortestPath::init(const HashSet& startVids, const HashSet& endVids,
allRightPaths_.reserve(rowSize);
resultDs_.resize(rowSize);
for (const auto& startVid : startVids) {
HashSet srcVid({startVid});
for (const auto& endVid : endVids) {
robin_hood::unordered_flat_map<Value, std::vector<Row>, std::hash<Value>> steps;
std::vector<Row> dummy;
steps.emplace(endVid, std::move(dummy));
HalfPath originRightPath({std::move(steps)});
allRightPaths_.emplace_back(std::move(originRightPath));

DataSet startDs, endDs;
startDs.rows.emplace_back(Row({startVid}));
endDs.rows.emplace_back(Row({endVid}));
leftVids_.emplace_back(std::move(startDs));
rightVids_.emplace_back(std::move(endDs));
HashSet dstVid({endVid});
leftVids_.emplace_back(srcVid);
rightVids_.emplace_back(std::move(dstVid));
}
}
}
Expand Down Expand Up @@ -88,11 +87,13 @@ folly::Future<Status> SingleShortestPath::getNeighbors(size_t rowNum,
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());
auto& inputRows = reverse ? rightVids_[rowNum].rows : leftVids_[rowNum].rows;
auto& inputVids = reverse ? rightVids_[rowNum] : leftVids_[rowNum];
std::vector<Value> vids(inputVids.begin(), inputVids.end());
inputVids.clear();
return storageClient
->getNeighbors(param,
{nebula::kVid},
std::move(inputRows),
std::move(vids),
{},
pathNode_->edgeDirection(),
nullptr,
Expand Down Expand Up @@ -137,9 +138,7 @@ Status SingleShortestPath::doBuildPath(size_t rowNum, GetNeighborsIter* iter, bo
allSteps.emplace_back();
auto& currentStep = allSteps.back();

HashSet uniqueDst;
uniqueDst.reserve(iterSize);
std::vector<Row> nextStepVids;
auto& nextStepVids = reverse ? rightVids_[rowNum] : leftVids_[rowNum];
nextStepVids.reserve(iterSize);

QueryExpressionContext ctx(qctx_->ectx());
Expand All @@ -149,14 +148,12 @@ Status SingleShortestPath::doBuildPath(size_t rowNum, GetNeighborsIter* iter, bo
continue;
}
auto& edge = edgeVal.getEdge();
auto dst = edge.dst;
auto& dst = edge.dst;
if (visitedVids.find(dst) != visitedVids.end()) {
continue;
}
visitedVids.emplace(edge.src);
if (uniqueDst.emplace(dst).second) {
nextStepVids.emplace_back(Row({dst}));
}
nextStepVids.emplace(dst);
auto vertex = iter->getVertex();
Row step;
step.emplace_back(std::move(vertex));
Expand All @@ -170,13 +167,7 @@ Status SingleShortestPath::doBuildPath(size_t rowNum, GetNeighborsIter* iter, bo
steps.emplace_back(std::move(step));
}
}
visitedVids.insert(std::make_move_iterator(uniqueDst.begin()),
std::make_move_iterator(uniqueDst.end()));
if (reverse) {
rightVids_[rowNum].rows.swap(nextStepVids);
} else {
leftVids_[rowNum].rows.swap(nextStepVids);
}
visitedVids.insert(nextStepVids.begin(), nextStepVids.end());
return Status::OK();
}

Expand All @@ -191,8 +182,8 @@ folly::Future<Status> SingleShortestPath::handleResponse(size_t rowNum, size_t s
if (result || stepNum * 2 >= maxStep_) {
return folly::makeFuture<Status>(Status::OK());
}
auto& leftVids = leftVids_[rowNum].rows;
auto& rightVids = rightVids_[rowNum].rows;
auto& leftVids = leftVids_[rowNum];
auto& rightVids = rightVids_[rowNum];
if (leftVids.empty() || rightVids.empty()) {
return folly::makeFuture<Status>(Status::OK());
}
Expand Down
Loading

0 comments on commit db8b909

Please sign in to comment.