diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index af2c3ef870e..38810202016 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -129,12 +129,21 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) { LOG(ERROR) << "Connect to the MetaServer Failed"; return false; } + + // Verify the graph server version auto status = verifyVersion(); if (!status.ok()) { LOG(ERROR) << status; return false; } + // Save graph version to meta + status = saveVersionToMeta(); + if (!status.ok()) { + LOG(ERROR) << status; + return false; + } + CHECK(bgThread_->start()); LOG(INFO) << "Register time task for heartbeat!"; size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900); @@ -161,10 +170,14 @@ void MetaClient::heartBeatThreadFunc() { bgThread_->addDelayTask( FLAGS_heartbeat_interval_secs * 1000, &MetaClient::heartBeatThreadFunc, this); }; - auto ret = heartbeat().get(); - if (!ret.ok()) { - LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); - return; + // UNKNOWN is reserved for tools such as upgrader, in that case the ip/port is not set. We do + // not send heartbeat to meta to avoid writing error host info (e.g. Host("", 0)) + if (options_.role_ != cpp2::HostRole::UNKNOWN) { + auto ret = heartbeat().get(); + if (!ret.ok()) { + LOG(ERROR) << "Heartbeat failed, status:" << ret.status(); + return; + } } // if MetaServer has some changes, refresh the localCache_ @@ -227,7 +240,9 @@ bool MetaClient::loadUsersAndRoles() { } bool MetaClient::loadData() { - if (localDataLastUpdateTime_ == metadLastUpdateTime_) { + // UNKNOWN role will skip heartbeat + if (options_.role_ != cpp2::HostRole::UNKNOWN && + localDataLastUpdateTime_ == metadLastUpdateTime_) { return true; } @@ -2949,7 +2964,9 @@ StatusOr> MetaClient::getListenerHostTypeBySpace } bool MetaClient::loadCfg() { - if (options_.skipConfig_ || localCfgLastUpdateTime_ == metadLastUpdateTime_) { + // UNKNOWN role will skip heartbeat + if (options_.skipConfig_ || (options_.role_ != cpp2::HostRole::UNKNOWN && + localCfgLastUpdateTime_ == metadLastUpdateTime_)) { return true; } if (!configReady_ && !registerCfg()) { @@ -3611,5 +3628,29 @@ Status MetaClient::verifyVersion() { return Status::OK(); } +Status MetaClient::saveVersionToMeta() { + auto req = cpp2::SaveGraphVersionReq(); + req.build_version_ref() = getOriginVersion(); + req.host_ref() = options_.localHost_; + folly::Promise> promise; + auto future = promise.getFuture(); + getResponse( + std::move(req), + [](auto client, auto request) { return client->future_saveGraphVersion(request); }, + [](cpp2::SaveGraphVersionResp&& resp) { return std::move(resp); }, + std::move(promise)); + + auto respStatus = std::move(future).get(); + if (!respStatus.ok()) { + return respStatus.status(); + } + auto resp = std::move(respStatus).value(); + if (resp.get_code() != nebula::cpp2::ErrorCode::SUCCEEDED) { + return Status::Error("Failed to save graph version into meta, error code: %s", + apache::thrift::util::enumNameSafe(resp.get_code()).c_str()); + } + return Status::OK(); +} + } // namespace meta } // namespace nebula diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 7de27e61f7a..15c89620eee 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -733,8 +733,14 @@ class MetaClient : public BaseMetaClient { ListenersMap doGetListenersMap(const HostAddr& host, const LocalCache& localCache); + // Checks if the the client version is compatible with the server version by checking the + // whilelist in meta. Status verifyVersion(); + // Save the version of the graph service into meta so that it could be looked up. + // This method should be only called in the internal client. + Status saveVersionToMeta(); + private: std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; diff --git a/src/common/utils/NebulaKeyUtils.cpp b/src/common/utils/NebulaKeyUtils.cpp index d026f3efed0..479ad2c939c 100644 --- a/src/common/utils/NebulaKeyUtils.cpp +++ b/src/common/utils/NebulaKeyUtils.cpp @@ -253,6 +253,7 @@ std::vector NebulaKeyUtils::snapshotPrefix(PartitionID partId) { if (partId == 0) { result.emplace_back(""); } else { + result.emplace_back(vertexPrefix(partId)); result.emplace_back(tagPrefix(partId)); result.emplace_back(edgePrefix(partId)); result.emplace_back(IndexKeyUtils::indexPrefix(partId)); diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.cpp b/src/graph/executor/algo/BFSShortestPathExecutor.cpp index 87730bf31f5..c01352a6e18 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.cpp +++ b/src/graph/executor/algo/BFSShortestPathExecutor.cpp @@ -135,13 +135,12 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { std::vector> futures; for (auto& vid : meetVids) { batchVids.push_back(vid); - if (i == totalSize - 1 || batchVids.size() == batchSize) { + if (++i == totalSize || batchVids.size() == batchSize) { auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() { return doConjunct(vids, oddStep); }); futures.emplace_back(std::move(future)); } - i++; } return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.cpp b/src/graph/executor/algo/MultiShortestPathExecutor.cpp index 5ab58fa4847..66ba6b82afe 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.cpp +++ b/src/graph/executor/algo/MultiShortestPathExecutor.cpp @@ -39,10 +39,19 @@ folly::Future MultiShortestPathExecutor::execute() { }) .thenValue([this](auto&& resp) { UNUSED(resp); - preLeftPaths_.swap(leftPaths_); - preRightPaths_.swap(rightPaths_); + preRightPaths_ = rightPaths_; + // update history + for (auto& iter : leftPaths_) { + historyLeftPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()), + std::make_move_iterator(iter.second.end())); + } + for (auto& iter : rightPaths_) { + historyRightPaths_[iter.first].insert(std::make_move_iterator(iter.second.begin()), + std::make_move_iterator(iter.second.end())); + } leftPaths_.clear(); rightPaths_.clear(); + step_++; DataSet ds; ds.colNames = pathNode_->colNames(); @@ -58,24 +67,40 @@ void MultiShortestPathExecutor::init() { for (; rIter->valid(); rIter->next()) { auto& vid = rIter->getColumn(0); if (rightVids.emplace(vid).second) { - preRightPaths_[vid].push_back({Path(Vertex(vid, {}), {})}); + std::vector tmp({Path(Vertex(vid, {}), {})}); + historyRightPaths_[vid].emplace(vid, tmp); + preRightPaths_[vid].emplace(vid, std::move(tmp)); } } std::set leftVids; for (; lIter->valid(); lIter->next()) { auto& vid = lIter->getColumn(0); + std::vector tmp({Path(Vertex(vid, {}), {})}); + historyLeftPaths_[vid].emplace(vid, std::move(tmp)); leftVids.emplace(vid); } for (const auto& leftVid : leftVids) { for (const auto& rightVid : rightVids) { if (leftVid != rightVid) { - terminationMap_.insert({leftVid, {rightVid, true}}); + terminationMap_.emplace(leftVid, std::make_pair(rightVid, true)); } } } } +std::vector MultiShortestPathExecutor::createPaths(const std::vector& paths, + const Edge& edge) { + std::vector newPaths; + newPaths.reserve(paths.size()); + for (const auto& p : paths) { + Path path = p; + path.steps.emplace_back(Step(Vertex(edge.dst, {}), edge.type, edge.name, edge.ranking, {})); + newPaths.emplace_back(std::move(path)); + } + return newPaths; +} + Status MultiShortestPathExecutor::buildPath(bool reverse) { auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter() : ectx_->getResult(pathNode_->leftInputVar()).iter(); @@ -96,10 +121,23 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) { Path path; path.src = Vertex(src, {}); path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {})); - currentPaths[dst].emplace_back(std::move(path)); + auto foundDst = currentPaths.find(dst); + if (foundDst != currentPaths.end()) { + auto foundSrc = foundDst->second.find(src); + if (foundSrc != foundDst->second.end()) { + // same , different edge type or rank + foundSrc->second.emplace_back(std::move(path)); + } else { + std::vector tmp({std::move(path)}); + foundDst->second.emplace(src, std::move(tmp)); + } + } else { + std::vector tmp({std::move(path)}); + currentPaths[dst].emplace(src, std::move(tmp)); + } } } else { - auto& historyPaths = reverse ? preRightPaths_ : preLeftPaths_; + auto& historyPaths = reverse ? historyRightPaths_ : historyLeftPaths_; for (; iter->valid(); iter->next()) { auto edgeVal = iter->getEdge(); if (UNLIKELY(!edgeVal.isEdge())) { @@ -108,50 +146,93 @@ Status MultiShortestPathExecutor::buildPath(bool reverse) { auto& edge = edgeVal.getEdge(); auto& src = edge.src; auto& dst = edge.dst; - for (const auto& histPath : historyPaths[src]) { - Path path = histPath; - path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {})); - if (path.hasDuplicateVertices()) { - continue; + auto& prePaths = historyPaths[src]; + + auto foundHistDst = historyPaths.find(dst); + if (foundHistDst == historyPaths.end()) { + // dst not in history + auto foundDst = currentPaths.find(dst); + if (foundDst == currentPaths.end()) { + // dst not in current, new edge + for (const auto& prePath : prePaths) { + currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge)); + } + } else { + // dst in current + for (const auto& prePath : prePaths) { + auto newPaths = createPaths(prePath.second, edge); + auto foundSrc = foundDst->second.find(prePath.first); + if (foundSrc == foundDst->second.end()) { + foundDst->second.emplace(prePath.first, std::move(newPaths)); + } else { + foundSrc->second.insert(foundSrc->second.begin(), + std::make_move_iterator(newPaths.begin()), + std::make_move_iterator(newPaths.end())); + } + } + } + } else { + // dst in history + auto& historyDstPaths = foundHistDst->second; + for (const auto& prePath : prePaths) { + if (historyDstPaths.find(prePath.first) != historyDstPaths.end()) { + // loop: a->b->c->a or a->b->c->b, + // filter out path that with duplicate vertex or have already been found before + continue; + } + auto foundDst = currentPaths.find(dst); + if (foundDst == currentPaths.end()) { + currentPaths[dst].emplace(prePath.first, createPaths(prePath.second, edge)); + } else { + auto newPaths = createPaths(prePath.second, edge); + auto foundSrc = foundDst->second.find(prePath.first); + if (foundSrc == foundDst->second.end()) { + foundDst->second.emplace(prePath.first, std::move(newPaths)); + } else { + foundSrc->second.insert(foundSrc->second.begin(), + std::make_move_iterator(newPaths.begin()), + std::make_move_iterator(newPaths.end())); + } + } } - currentPaths[dst].emplace_back(std::move(path)); } } } + // set nextVid const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar(); setNextStepVid(currentPaths, nextVidVar); return Status::OK(); } -DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter, - Interims::iterator endIter, - bool oddStep) { - auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_; - DataSet ds; - for (; startIter != endIter; ++startIter) { - auto found = rightPaths.find(startIter->first); - if (found == rightPaths.end()) { - continue; - } - for (const auto& lPath : startIter->second) { - const auto& srcVid = lPath.src.vid; - auto range = terminationMap_.equal_range(srcVid); - for (const auto& rPath : found->second) { - const auto& dstVid = rPath.src.vid; - for (auto iter = range.first; iter != range.second; ++iter) { - if (iter->second.first == dstVid) { - auto forwardPath = lPath; - auto backwardPath = rPath; +DataSet MultiShortestPathExecutor::doConjunct( + const std::vector>& iters) { + auto buildPaths = + [](const std::vector& leftPaths, const std::vector& rightPaths, DataSet& ds) { + for (const auto& leftPath : leftPaths) { + for (const auto& rightPath : rightPaths) { + auto forwardPath = leftPath; + auto backwardPath = rightPath; backwardPath.reverse(); forwardPath.append(std::move(backwardPath)); - if (forwardPath.hasDuplicateVertices()) { - continue; - } Row row; row.values.emplace_back(std::move(forwardPath)); ds.rows.emplace_back(std::move(row)); - iter->second.second = false; + } + } + }; + + DataSet ds; + for (const auto& iter : iters) { + const auto& leftPaths = iter.first->second; + const auto& rightPaths = iter.second->second; + for (const auto& leftPath : leftPaths) { + auto range = terminationMap_.equal_range(leftPath.first); + for (const auto& rightPath : rightPaths) { + for (auto found = range.first; found != range.second; ++found) { + if (found->second.first == rightPath.first) { + buildPaths(leftPath.second, rightPath.second, ds); + found->second.second = false; } } } @@ -161,28 +242,51 @@ DataSet MultiShortestPathExecutor::doConjunct(Interims::iterator startIter, } folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { - size_t batchSize = leftPaths_.size() / static_cast(FLAGS_num_operator_threads); + auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_; + size_t leftPathSize = leftPaths_.size(); + size_t rightPathSize = rightPaths.size(); std::vector> futures; - size_t i = 0; + std::vector> pathIters; - auto startIter = leftPaths_.begin(); - for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) { - if (i++ == batchSize) { - auto endIter = leftIter; - endIter++; - auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() { - return doConjunct(startIter, endIter, oddStep); - }); - futures.emplace_back(std::move(future)); - i = 0; - startIter = endIter; + size_t i = 0; + if (leftPathSize > rightPathSize) { + size_t batchSize = leftPathSize / static_cast(FLAGS_num_operator_threads); + pathIters.reserve(batchSize); + for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) { + auto rightIter = rightPaths.find(leftIter->first); + if (rightIter == rightPaths.end()) { + continue; + } + pathIters.emplace_back(leftIter, rightIter); + if (++i == batchSize) { + auto future = folly::via( + runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); }); + futures.emplace_back(std::move(future)); + pathIters.reserve(batchSize); + i = 0; + } + } + } else { + size_t batchSize = rightPathSize / static_cast(FLAGS_num_operator_threads); + pathIters.reserve(batchSize); + for (auto rightIter = rightPaths.begin(); rightIter != rightPaths.end(); ++rightIter) { + auto leftIter = leftPaths_.find(rightIter->first); + if (leftIter == leftPaths_.end()) { + continue; + } + pathIters.emplace_back(leftIter, rightIter); + if (++i == batchSize) { + auto future = folly::via( + runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); }); + futures.emplace_back(std::move(future)); + pathIters.reserve(batchSize); + i = 0; + } } } if (i != 0) { - auto endIter = leftPaths_.end(); - auto future = folly::via(runner(), [this, startIter, endIter, oddStep]() { - return doConjunct(startIter, endIter, oddStep); - }); + auto future = + folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); }); futures.emplace_back(std::move(future)); } diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.h b/src/graph/executor/algo/MultiShortestPathExecutor.h index 2ea20286b1c..9b6d754aed9 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.h +++ b/src/graph/executor/algo/MultiShortestPathExecutor.h @@ -57,24 +57,27 @@ class MultiShortestPathExecutor final : public Executor { folly::Future execute() override; private: - // k: dst, v: paths to dst - using Interims = std::unordered_map>; + // key: dst, value: {key : src, value: paths} + using Interims = std::unordered_map>>; void init(); + std::vector createPaths(const std::vector& paths, const Edge& edge); Status buildPath(bool reverse); folly::Future conjunctPath(bool oddStep); - DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep); + DataSet doConjunct(const std::vector>& iters); void setNextStepVid(const Interims& paths, const string& var); private: const MultiShortestPath* pathNode_{nullptr}; size_t step_{1}; std::string terminationVar_; + // {src, } std::unordered_multimap> terminationMap_; Interims leftPaths_; - Interims preLeftPaths_; - Interims preRightPaths_; Interims rightPaths_; + Interims preRightPaths_; + Interims historyLeftPaths_; + Interims historyRightPaths_; DataSet currentDs_; }; diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp index e851e32caf1..3ae535a5dad 100644 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp +++ b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp @@ -132,7 +132,7 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { auto startIter = leftPaths_.begin(); for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) { - if (i++ == batchSize) { + if (++i == batchSize) { auto endIter = leftIter; endIter++; auto oddStepFuture = folly::via( diff --git a/src/graph/executor/test/FindPathTest.cpp b/src/graph/executor/test/FindPathTest.cpp index ec227e40f47..ffe5a8f62b9 100644 --- a/src/graph/executor/test/FindPathTest.cpp +++ b/src/graph/executor/test/FindPathTest.cpp @@ -710,7 +710,7 @@ TEST_F(FindPathTest, multiSourceShortestPath) { { DataSet expectLeftVid; expectLeftVid.colNames = {nebula::kVid}; - for (const auto& vid : {"a", "b", "c", "f", "g"}) { + for (const auto& vid : {"b", "f", "g"}) { Row row; row.values.emplace_back(vid); expectLeftVid.rows.emplace_back(std::move(row)); diff --git a/src/graph/planner/match/MatchPlanner.cpp b/src/graph/planner/match/MatchPlanner.cpp index e88e6733683..9cac84295e1 100644 --- a/src/graph/planner/match/MatchPlanner.cpp +++ b/src/graph/planner/match/MatchPlanner.cpp @@ -83,11 +83,19 @@ Status MatchPlanner::connectMatchPlan(SubPlan& queryPlan, MatchClauseContext* ma } if (!intersectedAliases.empty()) { if (matchCtx->isOptional) { + // connect LeftJoin match filter + if (matchCtx->where != nullptr) { + auto wherePlanStatus = + std::make_unique()->transform(matchCtx->where.get()); + NG_RETURN_IF_ERROR(wherePlanStatus); + auto wherePlan = std::move(wherePlanStatus).value(); + matchPlan = SegmentsConnector::addInput(wherePlan, matchPlan, true); + } queryPlan = - SegmentsConnector::leftJoin(matchCtx->qctx, matchPlan, queryPlan, intersectedAliases); + SegmentsConnector::leftJoin(matchCtx->qctx, queryPlan, matchPlan, intersectedAliases); } else { queryPlan = - SegmentsConnector::innerJoin(matchCtx->qctx, matchPlan, queryPlan, intersectedAliases); + SegmentsConnector::innerJoin(matchCtx->qctx, queryPlan, matchPlan, intersectedAliases); } } else { queryPlan.root = BiCartesianProduct::make(matchCtx->qctx, queryPlan.root, matchPlan.root); @@ -103,7 +111,7 @@ Status MatchPlanner::genQueryPartPlan(QueryContext* qctx, for (auto& match : queryPart.matchs) { connectMatchPlan(queryPlan, match.get()); // connect match filter - if (match->where != nullptr) { + if (match->where != nullptr && !match->isOptional) { auto wherePlanStatus = std::make_unique()->transform(match->where.get()); NG_RETURN_IF_ERROR(wherePlanStatus); auto wherePlan = std::move(wherePlanStatus).value(); diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index e4a3a71ea36..074dbd0bc7c 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -18,7 +18,7 @@ DEFINE_int32(num_netio_threads, "The number of networking threads, 0 for number of physical CPU cores"); DEFINE_int32(num_accept_threads, 1, "Number of threads to accept incoming connections"); DEFINE_int32(num_worker_threads, 0, "Number of threads to execute user queries"); -DEFINE_int32(num_operator_threads, 5, "Number of threads to execute a single operator"); +DEFINE_int32(num_operator_threads, 2, "Number of threads to execute a single operator"); DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option"); DEFINE_int32(listen_backlog, 1024, "Backlog of the listen socket"); DEFINE_string(listen_netdev, "any", "The network device to listen on"); diff --git a/src/graph/service/GraphService.cpp b/src/graph/service/GraphService.cpp index 810fbd20bb5..8628ed46be3 100644 --- a/src/graph/service/GraphService.cpp +++ b/src/graph/service/GraphService.cpp @@ -125,7 +125,6 @@ folly::Future GraphService::future_authenticate(const std::string& void GraphService::signout(int64_t sessionId) { VLOG(2) << "Sign out session " << sessionId; sessionManager_->removeSession(sessionId); - stats::StatsManager::decValue(kNumActiveSessions); } folly::Future GraphService::future_executeWithParameter( diff --git a/src/graph/session/GraphSessionManager.cpp b/src/graph/session/GraphSessionManager.cpp index 8ae85840c27..e7ccaf502c7 100644 --- a/src/graph/session/GraphSessionManager.cpp +++ b/src/graph/session/GraphSessionManager.cpp @@ -180,6 +180,7 @@ void GraphSessionManager::removeSession(SessionID id) { auto sessionCopy = iter->second->getSession(); std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip(); activeSessions_.erase(iter); + stats::StatsManager::decValue(kNumActiveSessions); // delete session count from cache subSessionCount(key); } diff --git a/src/graph/validator/MatchValidator.cpp b/src/graph/validator/MatchValidator.cpp index 93cc0fc6be9..1e42f0849cf 100644 --- a/src/graph/validator/MatchValidator.cpp +++ b/src/graph/validator/MatchValidator.cpp @@ -803,6 +803,7 @@ Status MatchValidator::validateGroup(YieldClauseContext &yieldCtx, DCHECK(!cols.empty()); for (auto *col : cols) { auto *colExpr = col->expr(); + NG_RETURN_IF_ERROR(validateMatchPathExpr(colExpr, yieldCtx.aliasesAvailable, matchs)); auto colOldName = col->name(); if (colExpr->kind() != Expression::Kind::kAggregate) { auto collectAggCol = colExpr->clone(); @@ -834,8 +835,6 @@ Status MatchValidator::validateGroup(YieldClauseContext &yieldCtx, yieldCtx.groupKeys_.emplace_back(colExpr); } - NG_RETURN_IF_ERROR(validateMatchPathExpr(colExpr, yieldCtx.aliasesAvailable, matchs)); - yieldCtx.groupItems_.emplace_back(colExpr); yieldCtx.projCols_->addColumn( diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 08f5f6c40c8..988e560bd33 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -1148,16 +1148,29 @@ struct GetMetaDirInfoReq { struct VerifyClientVersionResp { 1: common.ErrorCode code, 2: common.HostAddr leader, - 3: optional binary error_msg; + 3: optional binary error_msg; } - struct VerifyClientVersionReq { 1: required binary client_version = common.version; 2: common.HostAddr host; 3: binary build_version; } +struct SaveGraphVersionResp { + 1: common.ErrorCode code, + 2: common.HostAddr leader, + 3: optional binary error_msg; +} + +// SaveGraphVersionReq is used to save the graph version of a graph service. +// This is for internal using only. +struct SaveGraphVersionReq { + 1: required binary client_version = common.version; + 2: common.HostAddr host; + 3: binary build_version; +} + service MetaService { ExecResp createSpace(1: CreateSpaceReq req); ExecResp dropSpace(1: DropSpaceReq req); @@ -1263,6 +1276,7 @@ service MetaService { GetMetaDirInfoResp getMetaDirInfo(1: GetMetaDirInfoReq req); VerifyClientVersionResp verifyClientVersion(1: VerifyClientVersionReq req) + SaveGraphVersionResp saveGraphVersion(1: SaveGraphVersionReq req) GetSegmentIdResp getSegmentId(1: GetSegmentIdReq req); } diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index d805bdbd075..c6f62387815 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -52,6 +52,7 @@ nebula_add_library( processors/admin/ListClusterInfoProcessor.cpp processors/admin/GetMetaDirInfoProcessor.cpp processors/admin/VerifyClientVersionProcessor.cpp + processors/admin/SaveGraphVersionProcessor.cpp processors/config/RegConfigProcessor.cpp processors/config/GetConfigProcessor.cpp processors/config/ListConfigsProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 3be96660b16..59118d8810f 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -16,6 +16,7 @@ #include "meta/processors/admin/ListClusterInfoProcessor.h" #include "meta/processors/admin/ListSnapshotsProcessor.h" #include "meta/processors/admin/RestoreProcessor.h" +#include "meta/processors/admin/SaveGraphVersionProcessor.h" #include "meta/processors/admin/VerifyClientVersionProcessor.h" #include "meta/processors/config/GetConfigProcessor.h" #include "meta/processors/config/ListConfigsProcessor.h" @@ -539,6 +540,12 @@ folly::Future MetaServiceHandler::future_verifyCl RETURN_FUTURE(processor); } +folly::Future MetaServiceHandler::future_saveGraphVersion( + const cpp2::SaveGraphVersionReq& req) { + auto* processor = SaveGraphVersionProcessor::instance(kvstore_); + RETURN_FUTURE(processor); +} + folly::Future MetaServiceHandler::future_getWorkerId( const cpp2::GetWorkerIdReq& req) { auto* processor = GetWorkerIdProcessor::instance(kvstore_); diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 8638481f8d9..9a2b4365b64 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -226,6 +226,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_verifyClientVersion( const cpp2::VerifyClientVersionReq& req) override; + folly::Future future_saveGraphVersion( + const cpp2::SaveGraphVersionReq& req) override; + folly::Future future_getWorkerId(const cpp2::GetWorkerIdReq& req) override; folly::Future future_getSegmentId( diff --git a/src/meta/processors/admin/SaveGraphVersionProcessor.cpp b/src/meta/processors/admin/SaveGraphVersionProcessor.cpp new file mode 100644 index 00000000000..02df0e41033 --- /dev/null +++ b/src/meta/processors/admin/SaveGraphVersionProcessor.cpp @@ -0,0 +1,36 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "meta/processors/admin/SaveGraphVersionProcessor.h" + +#include "common/graph/Response.h" +#include "version/Version.h" + +namespace nebula { +namespace meta { +void SaveGraphVersionProcessor::process(const cpp2::SaveGraphVersionReq& req) { + const auto& host = req.get_host(); + + // Build a map of graph service host and its version + auto versionKey = MetaKeyUtils::versionKey(host); + auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str()); + std::vector versionData; + versionData.emplace_back(std::move(versionKey), std::move(versionVal)); + + // Save the version of the graph service + auto errCode = doSyncPut(versionData); + if (errCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Failed to save graph version, errorCode: " + << apache::thrift::util::enumNameSafe(errCode); + handleErrorCode(errCode); + onFinished(); + return; + } + handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); + + onFinished(); +} +} // namespace meta +} // namespace nebula diff --git a/src/meta/processors/admin/SaveGraphVersionProcessor.h b/src/meta/processors/admin/SaveGraphVersionProcessor.h new file mode 100644 index 00000000000..5c6bac9a6e2 --- /dev/null +++ b/src/meta/processors/admin/SaveGraphVersionProcessor.h @@ -0,0 +1,27 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef META_SAVEGRAPHVERSIONPROCESSOR_H_ +#define META_SAVEGRAPHVERSIONPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" + +namespace nebula { +namespace meta { +class SaveGraphVersionProcessor final : public BaseProcessor { + public: + static SaveGraphVersionProcessor* instance(kvstore::KVStore* kvstore) { + return new SaveGraphVersionProcessor(kvstore); + } + + void process(const cpp2::SaveGraphVersionReq& req); + + private: + explicit SaveGraphVersionProcessor(kvstore::KVStore* kvstore) + : BaseProcessor(kvstore) {} +}; +} // namespace meta +} // namespace nebula +#endif // META_SAVEGRAPHVERSIONPROCESSOR_H_ diff --git a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp index ab400bc4cca..b95f2f2f648 100644 --- a/src/meta/processors/admin/VerifyClientVersionProcessor.cpp +++ b/src/meta/processors/admin/VerifyClientVersionProcessor.cpp @@ -25,14 +25,8 @@ void VerifyClientVersionProcessor::process(const cpp2::VerifyClientVersionReq& r "Meta client version(%s) is not accepted, current meta client white list: %s.", req.get_client_version().c_str(), FLAGS_client_white_list.c_str()); - } else { - const auto& host = req.get_host(); - auto versionKey = MetaKeyUtils::versionKey(host); - auto versionVal = MetaKeyUtils::versionVal(req.get_build_version().c_str()); - std::vector versionData; - versionData.emplace_back(std::move(versionKey), std::move(versionVal)); - handleErrorCode(doSyncPut(versionData)); } + onFinished(); } } // namespace meta diff --git a/src/meta/processors/zone/DropHostsProcessor.cpp b/src/meta/processors/zone/DropHostsProcessor.cpp index c4a26e07112..51323d5ca14 100644 --- a/src/meta/processors/zone/DropHostsProcessor.cpp +++ b/src/meta/processors/zone/DropHostsProcessor.cpp @@ -90,21 +90,13 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { return std::find(hosts.begin(), hosts.end(), h) != hosts.end(); })) { LOG(INFO) << "Drop zone " << zoneName; - code = checkRelatedSpaceAndCollect(zoneName, holder.get()); + code = checkRelatedSpaceAndCollect(zoneName, &spaceMap); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "Check related space failed"; break; } holder->remove(MetaKeyUtils::zoneKey(zoneName)); - - for (auto& [spaceId, properties] : spaceMap) { - const std::vector& curZones = properties.get_zone_names(); - std::set zm(curZones.begin(), curZones.end()); - zm.erase(zoneName); - std::vector newZones(zm.begin(), zm.end()); - properties.zone_names_ref() = std::move(newZones); - } } else { // Delete some hosts in the zone for (auto& h : hosts) { @@ -162,38 +154,22 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) { } nebula::cpp2::ErrorCode DropHostsProcessor::checkRelatedSpaceAndCollect( - const std::string& zoneName, kvstore::BatchHolder* holder) { - const auto& prefix = MetaKeyUtils::spacePrefix(); - auto ret = doPrefix(prefix); - if (!nebula::ok(ret)) { - auto retCode = nebula::error(ret); - LOG(INFO) << "List spaces failed, error " << apache::thrift::util::enumNameSafe(retCode); - return nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND; - } - - auto iter = nebula::value(ret).get(); - while (iter->valid()) { - auto properties = MetaKeyUtils::parseSpace(iter->val()); - size_t replicaFactor = properties.get_replica_factor(); - auto zones = properties.get_zone_names(); - LOG(INFO) << "replica_factor " << replicaFactor << " zone size " << zones.size(); - auto it = std::find(zones.begin(), zones.end(), zoneName); - if (it != zones.end()) { - if (zones.size() == replicaFactor) { + const std::string& zoneName, std::map* spaceMap) { + for (auto& [spaceId, properties] : *spaceMap) { + const std::vector& curZones = properties.get_zone_names(); + std::set zm(curZones.begin(), curZones.end()); + if (zm.count(zoneName)) { + int32_t zoneSize = zm.size(); + if (zoneSize == properties.get_replica_factor()) { LOG(INFO) << "Zone size is same with replica factor"; return nebula::cpp2::ErrorCode::E_CONFLICT; } else { - zones.erase(it); - properties.zone_names_ref() = zones; - - auto spaceKey = iter->key().data(); - auto spaceVal = MetaKeyUtils::spaceVal(properties); - holder->put(std::move(spaceKey), std::move(spaceVal)); + zm.erase(zoneName); + std::vector newZones(zm.begin(), zm.end()); + properties.zone_names_ref() = std::move(newZones); } } - iter->next(); } - return nebula::cpp2::ErrorCode::SUCCEEDED; } diff --git a/src/meta/processors/zone/DropHostsProcessor.h b/src/meta/processors/zone/DropHostsProcessor.h index fbb189d0c5f..b746d42ce20 100644 --- a/src/meta/processors/zone/DropHostsProcessor.h +++ b/src/meta/processors/zone/DropHostsProcessor.h @@ -36,8 +36,8 @@ class DropHostsProcessor : public BaseProcessor { * @param holder * @return */ - nebula::cpp2::ErrorCode checkRelatedSpaceAndCollect(const std::string& zoneName, - kvstore::BatchHolder* holder); + nebula::cpp2::ErrorCode checkRelatedSpaceAndCollect( + const std::string& zoneName, std::map* spaceMap); }; } // namespace meta diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index ae328d406b5..1c63165595d 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -101,8 +101,14 @@ StoragePlan ScanEdgeProcessor::buildPlan( edges.emplace_back( std::make_unique(context, &edgeContext_, ec.first, &ec.second)); } - auto output = std::make_unique( - context, std::move(edges), enableReadFollower_, limit_, cursors, result, expCtx, filter_); + auto output = std::make_unique(context, + std::move(edges), + enableReadFollower_, + limit_, + cursors, + result, + expCtx, + filter_ == nullptr ? nullptr : filter_->clone()); plan.addNode(std::move(output)); return plan; diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 6dafcda714d..7806daebfdd 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -104,8 +104,15 @@ StoragePlan ScanVertexProcessor::buildPlan( for (const auto& tc : tagContext_.propContexts_) { tags.emplace_back(std::make_unique(context, &tagContext_, tc.first, &tc.second)); } - auto output = std::make_unique( - context, std::move(tags), enableReadFollower_, limit_, cursors, result, expCtx, filter_); + auto output = + std::make_unique(context, + std::move(tags), + enableReadFollower_, + limit_, + cursors, + result, + expCtx, + filter_ == nullptr ? nullptr : filter_->clone()); plan.addNode(std::move(output)); return plan; diff --git a/tests/tck/features/bugfix/AggPatternExpression.feature b/tests/tck/features/bugfix/AggPatternExpression.feature new file mode 100644 index 00000000000..31b83f860d9 --- /dev/null +++ b/tests/tck/features/bugfix/AggPatternExpression.feature @@ -0,0 +1,18 @@ +# Copyright (c) 2022 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License. +# #4175 +Feature: Test crash when aggregate with pattern expression + + Background: + Given a graph with space named "nba" + + Scenario: Crash when aggregate with pattern expression + # TODO aggregate should bypass all input, like `(v)--(:team)` here + When executing query: + """ + MATCH (v:player) WHERE id(v) == 'Tim Duncan' return v.player.name AS name, size((v)--(:team)) + count(v.player.name) * 2 AS count + """ + Then the result should be, in any order, with relax comparison: + | name | count | + | 'Tim Duncan' | NULL | diff --git a/tests/tck/features/match/MultiLineMultiQueryParts.feature b/tests/tck/features/match/MultiLineMultiQueryParts.feature index 3972a9a7cbd..3bcb3e8fe9f 100644 --- a/tests/tck/features/match/MultiLineMultiQueryParts.feature +++ b/tests/tck/features/match/MultiLineMultiQueryParts.feature @@ -29,7 +29,6 @@ Feature: Multi Line Multi Query Parts | "Tim Duncan" | "Boris Diaw" | "Tim Duncan" | When executing query: """ - USE nba; MATCH (m)-[]-(n), (n)-[]-(l) WHERE id(n)=="Tim Duncan" RETURN m.player.name AS n1, n.player.name AS n2, l.player.name AS n3 ORDER BY n1, n2, n3 LIMIT 10 """ @@ -47,7 +46,6 @@ Feature: Multi Line Multi Query Parts | "Aron Baynes" | "Tim Duncan" | "Manu Ginobili" | When executing query: """ - USE nba; MATCH (m)-[]-(n), (n)-[]-(l), (l)-[]-(h) WHERE id(m)=="Tim Duncan" RETURN m.player.name AS n1, n.player.name AS n2, l.team.name AS n3, h.player.name AS n4 ORDER BY n1, n2, n3, n4 LIMIT 10 @@ -68,7 +66,6 @@ Feature: Multi Line Multi Query Parts Scenario: Multi Line Multi Match When executing query: """ - USE nba; MATCH (m)-[]-(n) WHERE id(m)=="Tim Duncan" MATCH (n)-[]-(l) RETURN m.player.name AS n1, n.player.name AS n2, @@ -89,7 +86,6 @@ Feature: Multi Line Multi Query Parts | "Tim Duncan" | "Boris Diaw" | "Tim Duncan" | When executing query: """ - USE nba; MATCH (m)-[]-(n),(n) WHERE id(m)=="Tim Duncan" and id(n)=="Tony Parker" MATCH (n)-[]-(l) where n.player.age43 MATCH (n:player) WHERE v.player.age>40 and n.player.age>v.player.age RETURN count(*) AS count @@ -109,7 +104,6 @@ Feature: Multi Line Multi Query Parts | 5 | When executing query: """ - USE nba; MATCH (m)-[]-(n) WHERE id(m)=="Tim Duncan" MATCH (n)-[]-(l), (l)-[]-(h) RETURN m.player.name AS n1, n.player.name AS n2, l.team.name AS n3, h.player.name AS n4 @@ -129,7 +123,6 @@ Feature: Multi Line Multi Query Parts | "Tim Duncan" | "Aron Baynes" | "Spurs" | "Boris Diaw" | When executing query: """ - USE nba; MATCH (m)-[]-(n) WHERE id(m)=="Tim Duncan" MATCH (n)-[]-(l) MATCH (l)-[]-(h) @@ -150,7 +143,6 @@ Feature: Multi Line Multi Query Parts | "Tim Duncan" | "Aron Baynes" | "Spurs" | "Boris Diaw" | When executing query: """ - USE nba; MATCH (v:player{name:"Tony Parker"}) WITH v AS a MATCH p=(o:player{name:"Tim Duncan"})-[]->(a) @@ -164,7 +156,6 @@ Feature: Multi Line Multi Query Parts Scenario: Multi Line Optional Match When executing query: """ - USE nba; MATCH (m)-[]-(n) WHERE id(m)=="Tim Duncan" OPTIONAL MATCH (n)<-[:serve]-(l) RETURN m.player.name AS n1, n.player.name AS n2, l AS n3 ORDER BY n1, n2, n3 LIMIT 10 @@ -183,7 +174,6 @@ Feature: Multi Line Multi Query Parts | "Tim Duncan" | "Manu Ginobili" | NULL | When executing query: """ - USE nba; MATCH (m)-[]-(n),(n) WHERE id(m)=="Tim Duncan" and id(n)=="Tony Parker" OPTIONAL MATCH (n)-[]-(l) where n.player.age < m.player.age RETURN count(*) AS count @@ -193,7 +183,6 @@ Feature: Multi Line Multi Query Parts | 64 | When executing query: """ - USE nba; OPTIONAL match (v:player) WHERE v.player.age > 41 MATCH (v:player) WHERE v.player.age>40 RETURN count(*) AS count @@ -203,7 +192,6 @@ Feature: Multi Line Multi Query Parts | 7 | When executing query: """ - USE nba; OPTIONAL match (v:player) WHERE v.player.age>43 MATCH (n:player) WHERE n.player.age>40 RETURN count(*) AS count @@ -211,11 +199,64 @@ Feature: Multi Line Multi Query Parts Then the result should be, in order: | count | | 32 | + When executing query: + """ + OPTIONAL MATCH (v:player) WHERE v.player.age > 40 and v.player.age<46 + MATCH (v:player) WHERE v.player.age>43 + RETURN count(*) AS count + """ + Then the result should be, in any order: + | count | + | 2 | + When executing query: + """ + MATCH (v:player) WHERE v.player.age > 40 and v.player.age<46 + OPTIONAL MATCH (v:player) WHERE v.player.age>43 + RETURN count(*) AS count + """ + Then the result should be, in any order: + | count | + | 6 | + When executing query: + """ + OPTIONAL MATCH (v:player) WHERE v.player.age > 40 and v.player.age<46 + OPTIONAL MATCH (v:player) WHERE v.player.age>43 + RETURN count(*) AS count + """ + Then the result should be, in any order: + | count | + | 6 | + When executing query: + """ + OPTIONAL MATCH (v:player) WHERE v.player.age>43 + MATCH (v:player) WHERE v.player.age > 40 and v.player.age<46 + RETURN count(*) AS count + """ + Then the result should be, in any order: + | count | + | 2 | + When executing query: + """ + MATCH (v:player) WHERE v.player.age>43 + OPTIONAL MATCH (v:player) WHERE v.player.age > 40 and v.player.age<46 + RETURN count(*) AS count + """ + Then the result should be, in any order: + | count | + | 4 | + When executing query: + """ + OPTIONAL MATCH (v:player) WHERE v.player.age>43 + OPTIONAL MATCH (v:player) WHERE v.player.age > 40 and v.player.age<46 + RETURN count(*) AS count + """ + Then the result should be, in any order: + | count | + | 4 | Scenario: Multi Line Multi Query Parts When executing query: """ - USE nba; MATCH (m)-[]-(n) WHERE id(m)=="Tim Duncan" WITH n, n.player.name AS n1 ORDER BY n1 LIMIT 10 MATCH (n)-[]-(l) @@ -237,7 +278,6 @@ Feature: Multi Line Multi Query Parts | "Boris Diaw" | "Tim Duncan" | When executing query: """ - USE nba; MATCH (m:player{name:"Tim Duncan"})-[:like]-(n)--() WITH m,count(*) AS lcount MATCH (m)--(n) @@ -248,7 +288,6 @@ Feature: Multi Line Multi Query Parts | 19 | 110 | When executing query: """ - USE nba; MATCH (m:player{name:"Tim Duncan"})-[:like]-(n)--() WITH m,n MATCH (m)--(n) @@ -259,7 +298,6 @@ Feature: Multi Line Multi Query Parts | 270 | When executing query: """ - USE nba; MATCH (m)-[]-(n) WHERE id(m)=="Tim Duncan" OPTIONAL MATCH (n)-->(v) WHERE v.player.age < m.player.age RETURN count(*) AS count @@ -269,7 +307,6 @@ Feature: Multi Line Multi Query Parts | 45 | When executing query: """ - USE nba; MATCH (a:player{age:42}) WITH a MATCH (b:player{age:40}) WHERE b.player.age43 WITH v MATCH (v:player) WHERE v.player.age>40 WITH v RETURN count(*) AS count @@ -301,7 +336,6 @@ Feature: Multi Line Multi Query Parts | 4 | When executing query: """ - USE nba; OPTIONAL match (v:player) WHERE v.player.age>43 WITH v MATCH (n:player) WHERE n.player.age>40 WITH v, n RETURN count(*) AS count @@ -311,7 +345,6 @@ Feature: Multi Line Multi Query Parts | 32 | When executing query: """ - USE nba; MATCH (a:player{age:42}) WITH a MATCH (b:player{age:40}) WHERE b.player.age