diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 7769d901f17..775b04c20a7 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -62,6 +62,14 @@ jobs: docker: name: build docker image runs-on: [self-hosted, nebula] + strategy: + fail-fast: false + matrix: + service: + - graphd + - metad + - storaged + - tools steps: - uses: webiny/action-post-run@2.0.1 with: @@ -76,38 +84,10 @@ jobs: - uses: docker/build-push-action@v2 with: context: . - file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - vesoft/nebula-graphd:nightly - target: graphd - push: true - - uses: docker/build-push-action@v2 - with: - context: . - file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - vesoft/nebula-storaged:nightly - target: storaged - push: true - - uses: docker/build-push-action@v2 - with: - context: . - file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - vesoft/nebula-metad:nightly - target: metad - push: true - - uses: docker/build-push-action@v2 - with: - context: . - file: ./docker/Dockerfile + file: ./docker/Dockerfile.${{ matrix.service }} platforms: linux/amd64,linux/arm64 tags: | - vesoft/nebula-tools:nightly - target: tools + vesoft/nebula-${{ matrix.service }}:nightly push: true coverage: diff --git a/.github/workflows/rc.yml b/.github/workflows/rc.yml index c6fe00cdca8..070dabdc7e9 100644 --- a/.github/workflows/rc.yml +++ b/.github/workflows/rc.yml @@ -68,6 +68,14 @@ jobs: docker_build: name: docker-build runs-on: [self-hosted, nebula] + strategy: + fail-fast: false + matrix: + service: + - graphd + - metad + - storaged + - tools steps: - uses: webiny/action-post-run@2.0.1 with: @@ -93,62 +101,16 @@ jobs: - uses: docker/build-push-action@v2 with: context: . - file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-graphd:${{ steps.tagname.outputs.tag }} - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-graphd:${{ steps.tagname.outputs.majorver }} - ${{ steps.docker.outputs.tag }} - push: true - target: graphd - build-args: | - BRANCH=${{ steps.tagname.outputs.tag }} - VERSION=${{ steps.tagname.outputs.tagnum }} - - - uses: docker/build-push-action@v2 - with: - context: . - file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-storaged:${{ steps.tagname.outputs.tag }} - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-storaged:${{ steps.tagname.outputs.majorver }} - ${{ steps.docker.outputs.tag }} - push: true - target: storaged - build-args: | - BRANCH=${{ steps.tagname.outputs.tag }} - VERSION=${{ steps.tagname.outputs.tagnum }} - - - uses: docker/build-push-action@v2 - with: - context: . - file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 - tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-metad:${{ steps.tagname.outputs.tag }} - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-metad:${{ steps.tagname.outputs.majorver }} - ${{ steps.docker.outputs.tag }} - push: true - target: metad - build-args: | - BRANCH=${{ steps.tagname.outputs.tag }} - VERSION=${{ steps.tagname.outputs.tagnum }} - - - uses: docker/build-push-action@v2 - with: - context: . - file: ./docker/Dockerfile + file: ./docker/Dockerfile.${{ matrix.service }} platforms: linux/amd64,linux/arm64 tags: | - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-tools:${{ steps.tagname.outputs.tag }} - ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-tools:${{ steps.tagname.outputs.majorver }} + ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-${{ matrix.service }}:${{ steps.tagname.outputs.tag }} + ${{ secrets.HARBOR_REGISTRY }}/vesoft/nebula-${{ matrix.service }}:${{ steps.tagname.outputs.majorver }} ${{ steps.docker.outputs.tag }} push: true - target: tools build-args: | BRANCH=${{ steps.tagname.outputs.tag }} - VERSION=${{ steps.tagname.outputs.tagnum }} + VERSON=${{ steps.tagname.outputs.tagnum }} test: name: test diff --git a/docker/Dockerfile b/docker/Dockerfile index e640e27166b..0a7a8bfcda4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,6 +2,7 @@ FROM vesoft/nebula-dev:centos7 as builder ARG BRANCH=master ARG VERSION= + COPY . /home/nebula/BUILD RUN cd /home/nebula/BUILD/package \ diff --git a/docker/Dockerfile.graphd b/docker/Dockerfile.graphd new file mode 100644 index 00000000000..75bf3e3984e --- /dev/null +++ b/docker/Dockerfile.graphd @@ -0,0 +1,26 @@ +FROM vesoft/nebula-dev:centos7 as builder + +ARG BRANCH=master +ARG VERSION= + +COPY . /home/nebula/BUILD + +RUN cd /home/nebula/BUILD/package \ + && [[ -z "${VERSION}" ]] \ + && ./package.sh -n OFF -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON \ + || ./package.sh -n OFF -v ${VERSION} -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON + +FROM centos:7 + +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-common.rpm /usr/local/nebula/nebula-common.rpm +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-graph.rpm /usr/local/nebula/nebula-graphd.rpm + +WORKDIR /usr/local/nebula + +RUN rpm -ivh *.rpm \ + && mkdir -p ./{logs,data,pids} \ + && rm -rf *.rpm + +EXPOSE 9669 19669 19670 + +ENTRYPOINT ["/usr/local/nebula/bin/nebula-graphd", "--flagfile=/usr/local/nebula/etc/nebula-graphd.conf", "--daemonize=false", "--containerized=true"] diff --git a/docker/Dockerfile.metad b/docker/Dockerfile.metad new file mode 100644 index 00000000000..b4ab34f8409 --- /dev/null +++ b/docker/Dockerfile.metad @@ -0,0 +1,26 @@ +FROM vesoft/nebula-dev:centos7 as builder + +ARG BRANCH=master +ARG VERSION= + +COPY . /home/nebula/BUILD + +RUN cd /home/nebula/BUILD/package \ + && [[ -z "${VERSION}" ]] \ + && ./package.sh -n OFF -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON \ + || ./package.sh -n OFF -v ${VERSION} -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON + +FROM centos:7 + +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-common.rpm /usr/local/nebula/nebula-common.rpm +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-meta.rpm /usr/local/nebula/nebula-metad.rpm + +WORKDIR /usr/local/nebula + +RUN rpm -ivh *.rpm \ + && mkdir -p ./{logs,data,pids} \ + && rm -rf *.rpm + +EXPOSE 9559 9560 19559 19560 + +ENTRYPOINT ["/usr/local/nebula/bin/nebula-metad", "--flagfile=/usr/local/nebula/etc/nebula-metad.conf", "--daemonize=false", "--containerized=true"] diff --git a/docker/Dockerfile.storaged b/docker/Dockerfile.storaged new file mode 100644 index 00000000000..6ca29687e37 --- /dev/null +++ b/docker/Dockerfile.storaged @@ -0,0 +1,26 @@ +FROM vesoft/nebula-dev:centos7 as builder + +ARG BRANCH=master +ARG VERSION= + +COPY . /home/nebula/BUILD + +RUN cd /home/nebula/BUILD/package \ + && [[ -z "${VERSION}" ]] \ + && ./package.sh -n OFF -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON \ + || ./package.sh -n OFF -v ${VERSION} -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON + +FROM centos:7 + +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-common.rpm /usr/local/nebula/nebula-common.rpm +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-storage.rpm /usr/local/nebula/nebula-storaged.rpm + +WORKDIR /usr/local/nebula + +RUN rpm -ivh *.rpm \ + && mkdir -p ./{logs,data,pids} \ + && rm -rf *.rpm + +EXPOSE 9777 9778 9779 9780 19779 19780 + +ENTRYPOINT ["/usr/local/nebula/bin/nebula-storaged", "--flagfile=/usr/local/nebula/etc/nebula-storaged.conf", "--daemonize=false", "--containerized=true"] diff --git a/docker/Dockerfile.tools b/docker/Dockerfile.tools new file mode 100644 index 00000000000..fd696809109 --- /dev/null +++ b/docker/Dockerfile.tools @@ -0,0 +1,23 @@ +FROM vesoft/nebula-dev:centos7 as builder + +ARG BRANCH=master +ARG VERSION= + +COPY . /home/nebula/BUILD + +RUN cd /home/nebula/BUILD/package \ + && [[ -z "${VERSION}" ]] \ + && ./package.sh -n OFF -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON \ + || ./package.sh -n OFF -v ${VERSION} -b ${BRANCH} -t RelWithDebInfo -s TRUE -c OFF -k ON + +FROM centos:7 + +COPY --from=builder /home/nebula/BUILD/pkg-build/cpack_output/nebula-*-tool.rpm /usr/local/nebula/nebula-tool.rpm + +WORKDIR /usr/local/nebula + +RUN rpm -ivh *.rpm \ + && rm -rf *.rpm + +# default entrypoint +ENTRYPOINT ["/usr/local/nebula/bin/db_dump"] diff --git a/docker/README.md b/docker/README.md index 5f40c1fd160..1e626b9b725 100644 --- a/docker/README.md +++ b/docker/README.md @@ -2,7 +2,7 @@ Following docker images will be ready in production. -- [vesoft/nebula-graphd](https://hub.docker.com/r/vesoft/nebula-graphd): nebula-graphd service built with `Dockerfile` and target `graphd` -- [vesoft/nebula-metad](https://hub.docker.com/r/vesoft/nebula-metad): nebula-metad service built with `Dockerfile` and target `metad` -- [vesoft/nebula-storaged](https://hub.docker.com/r/vesoft/nebula-storaged): nebula-storaged service built with `Dockerfile` and target `storaged` -- [vesoft/nebula-tools](https://hub.docker.com/r/vesoft/nebula-tools): nebula tools built with `Dockerfile` and target `tools`, including db_dump, meta_dump and db_upgrader +- [vesoft/nebula-graphd](https://hub.docker.com/r/vesoft/nebula-graphd): nebula-graphd service built with `Dockerfile.graphd` +- [vesoft/nebula-metad](https://hub.docker.com/r/vesoft/nebula-metad): nebula-metad service built with `Dockerfile.metad` +- [vesoft/nebula-storaged](https://hub.docker.com/r/vesoft/nebula-storaged): nebula-storaged service built with `Dockerfile.storaged` +- [vesoft/nebula-tools](https://hub.docker.com/r/vesoft/nebula-tools): nebula tools built with `Dockerfile.tools`, including db_dump, meta_dump and db_upgrader diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 94d4e008867..eb0806989a0 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -2484,11 +2484,6 @@ folly::Future> MetaClient::heartbeat() { } } - // TTL for clientAddrMap - // If multiple connections are created but do not authenticate, the clientAddrMap_ will keep - // growing. This is to clear the clientAddrMap_ regularly. - clearClientAddrMap(); - // info used in the agent, only set once // TOOD(spw): if we could add data path(disk) dynamicly in the future, it should be // reported every time it changes @@ -3658,20 +3653,5 @@ Status MetaClient::verifyVersion() { return Status::OK(); } -void MetaClient::clearClientAddrMap() { - if (clientAddrMap_.size() == 0) { - return; - } - - auto curTimestamp = time::WallClock::fastNowInSec(); - for (auto it = clientAddrMap_.cbegin(); it != clientAddrMap_.cend();) { - // The clientAddr is expired - if (it->second < curTimestamp) { - it = clientAddrMap_.erase(it); - } else { - ++it; - } - } -} } // namespace meta } // namespace nebula diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index eb089a7af6d..43c281b3629 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -155,7 +155,6 @@ using FTIndexMap = std::unordered_map; using SessionMap = std::unordered_map; -using clientAddrMap = folly::ConcurrentHashMap; class MetaChangedListener { public: virtual ~MetaChangedListener() = default; @@ -657,10 +656,6 @@ class MetaClient : public BaseMetaClient { return options_.localHost_.toString(); } - clientAddrMap& getClientAddrMap() { - return clientAddrMap_; - } - protected: // Return true if load succeeded. bool loadData(); @@ -747,9 +742,6 @@ class MetaClient : public BaseMetaClient { Status verifyVersion(); - // Removes expired keys in the clientAddrMap_ - void clearClientAddrMap(); - private: std::shared_ptr ioThreadPool_; std::shared_ptr> clientsMan_; @@ -830,18 +822,6 @@ class MetaClient : public BaseMetaClient { NameIndexMap tagNameIndexMap_; NameIndexMap edgeNameIndexMap_; - // TODO(Aiee) This is a walkaround to address the problem that using a lower version(< v2.6.0) - // client to connect with higher version(>= v3.0.0) Nebula service will cause a crash. - // - // The key here is the host of the client that sends the request, and the value indicates the - // expiration of the key because we don't want to keep the key forever. - // - // The assumption here is that there is ONLY ONE VERSION of the client in the host. - // - // This map will be updated when verifyVersion() is called. Only the clients since v2.6.0 will - // call verifyVersion(), thus we could determine whether the client version is lower than v2.6.0 - clientAddrMap clientAddrMap_; - // Global service client ServiceClientsList serviceClientList_; FTIndexMap fulltextIndexMap_; diff --git a/src/common/thread/test/GenericWorkerTest.cpp b/src/common/thread/test/GenericWorkerTest.cpp index 03991545a04..0080f39fdb2 100644 --- a/src/common/thread/test/GenericWorkerTest.cpp +++ b/src/common/thread/test/GenericWorkerTest.cpp @@ -120,7 +120,7 @@ TEST(GenericWorker, addRepeatTask) { } } -TEST(GenericWorker, purgeRepeatTask) { +TEST(GenericWorker, DISABLE_purgeRepeatTask) { GenericWorker worker; ASSERT_TRUE(worker.start()); { diff --git a/src/graph/service/GraphService.cpp b/src/graph/service/GraphService.cpp index 6b6dbe2eff4..810fbd20bb5 100644 --- a/src/graph/service/GraphService.cpp +++ b/src/graph/service/GraphService.cpp @@ -24,9 +24,6 @@ namespace nebula { namespace graph { -// The default value is 28800 seconds -const int64_t clientAddrTimeout = FLAGS_client_idle_timeout_secs; - Status GraphService::init(std::shared_ptr ioExecutor, const HostAddr& hostAddr) { auto addrs = network::NetworkUtils::toHosts(FLAGS_meta_server_addrs); @@ -72,10 +69,9 @@ folly::Future GraphService::future_authenticate(const std::string& auto ctx = std::make_unique>(); auto future = ctx->future(); - // Check username and password failed - // Check whether the client has called verifyClientVersion() - auto clientAddr = HostAddr(peer->getAddressStr(), peer->getPort()); - auto authResult = auth(username, password, clientAddr); + + // check username and password failed + auto authResult = auth(username, password); if (!authResult.ok()) { ctx->resp().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD; ctx->resp().errorMsg.reset(new std::string(authResult.toString())); @@ -207,24 +203,9 @@ folly::Future GraphService::future_executeJsonWithParameter( }); } -Status GraphService::auth(const std::string& username, - const std::string& password, - const HostAddr& clientIp) { +Status GraphService::auth(const std::string& username, const std::string& password) { auto metaClient = queryEngine_->metaClient(); - // TODO(Aiee) This is a walkaround to address the problem that using a lower version(< v2.6.0) - // client to connect with higher version(>= v3.0.0) Nebula service will cause a crash. - // - // Only the clients since v2.6.0 will call verifyVersion(), thus we could determine whether the - // client version is lower than v2.6.0 - auto clientAddrIt = metaClient->getClientAddrMap().find(clientIp); - if (clientAddrIt == metaClient->getClientAddrMap().end()) { - return Status::Error( - folly::sformat("The version of the client sending request from {} is too old, " - "please update the client.", - clientIp.toString())); - } - // Skip authentication if FLAGS_enable_authorize is false if (!FLAGS_enable_authorize) { return Status::OK(); @@ -270,13 +251,6 @@ folly::Future GraphService::future_verifyClientVe resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; } - // The client sent request has a version >= v2.6.0, mark the address as valid - auto* peer = getRequestContext()->getPeerAddress(); - auto clientAddr = HostAddr(peer->getAddressStr(), peer->getPort()); - - auto ttlTimestamp = time::WallClock::fastNowInSec() + clientAddrTimeout; - auto clientAddrMap = &metaClient_->getClientAddrMap(); - clientAddrMap->insert_or_assign(clientAddr, ttlTimestamp); return folly::makeFuture(std::move(resp)); } } // namespace graph diff --git a/src/graph/service/GraphService.h b/src/graph/service/GraphService.h index d96d6385004..85ed21a6ce4 100644 --- a/src/graph/service/GraphService.h +++ b/src/graph/service/GraphService.h @@ -54,7 +54,7 @@ class GraphService final : public cpp2::GraphServiceSvIf { std::unique_ptr metaClient_; private: - Status auth(const std::string& username, const std::string& password, const HostAddr& clientIp); + Status auth(const std::string& username, const std::string& password); std::unique_ptr sessionManager_; std::unique_ptr queryEngine_; diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index cc076a0e014..edf9c1fbe4c 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -15,6 +15,7 @@ #include "common/meta/SchemaManager.h" #include "kvstore/Common.h" #include "kvstore/CompactionFilter.h" +#include "kvstore/KVEngine.h" #include "kvstore/KVIterator.h" #include "kvstore/PartManager.h" #include "kvstore/raftex/RaftPart.h" @@ -429,14 +430,21 @@ class KVStore { const std::vector& files) = 0; /** - * @brief Write data to local storage engine only + * @brief return a WriteBatch object to do batch operation + * + * @return std::unique_ptr + */ + virtual std::unique_ptr startBatchWrite() = 0; + + /** + * @brief Write batch data to local storage only * * @param spaceId - * @param keyValues Key/values to write into only local storage engine instead of multiple replica + * @param batch * @return nebula::cpp2::ErrorCode */ - virtual nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId, - std::vector keyValues) = 0; + virtual nebula::cpp2::ErrorCode batchWriteWithoutReplicator( + GraphSpaceID spaceId, std::unique_ptr batch) = 0; /** * @brief Get the data paths diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 28e0ecc9521..ad4282686e6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -1273,8 +1273,6 @@ nebula::cpp2::ErrorCode NebulaStore::restoreFromFiles(GraphSpaceID spaceId, } auto space = nebula::value(spaceRet); - DCHECK_EQ(space->engines_.size(), 1); - for (auto& engine : space->engines_) { auto ret = engine->ingest(files, true); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -1285,8 +1283,12 @@ nebula::cpp2::ErrorCode NebulaStore::restoreFromFiles(GraphSpaceID spaceId, return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode NebulaStore::multiPutWithoutReplicator(GraphSpaceID spaceId, - std::vector keyValues) { +std::unique_ptr NebulaStore::startBatchWrite() { + return std::make_unique(); +} + +nebula::cpp2::ErrorCode NebulaStore::batchWriteWithoutReplicator( + GraphSpaceID spaceId, std::unique_ptr batch) { auto spaceRet = space(spaceId); if (!ok(spaceRet)) { LOG(WARNING) << "Get Space " << spaceId << " Failed"; @@ -1294,10 +1296,9 @@ nebula::cpp2::ErrorCode NebulaStore::multiPutWithoutReplicator(GraphSpaceID spac } auto space = nebula::value(spaceRet); - DCHECK_EQ(space->engines_.size(), 1); - for (auto& engine : space->engines_) { - auto ret = engine->multiPut(keyValues); + auto ret = engine->commitBatchWrite( + std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return ret; } diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 3a233985449..1fadcb3e827 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -664,14 +664,21 @@ class NebulaStore : public KVStore, public Handler { void fetchDiskParts(SpaceDiskPartsMap& diskParts) override; /** - * @brief Write data to local storage engine only + * @brief return a WriteBatch object to do batch operation + * + * @return std::unique_ptr + */ + std::unique_ptr startBatchWrite() override; + + /** + * @brief Write batch to local storage engine only * * @param spaceId - * @param keyValues Key/values to write into only local storage engine instead of multiple replica + * @param batch * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId, - std::vector keyValues) override; + nebula::cpp2::ErrorCode batchWriteWithoutReplicator(GraphSpaceID spaceId, + std::unique_ptr batch) override; /** * @brief Get the kvstore propery, only used in rocksdb diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index b1e4ca8d57c..f57ec600b68 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -22,57 +22,9 @@ namespace kvstore { using fs::FileType; using fs::FileUtils; -namespace { - -/*************************************** - * - * Implementation of WriteBatch - * - **************************************/ -class RocksWriteBatch : public WriteBatch { - private: - rocksdb::WriteBatch batch_; - - public: - RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {} - - virtual ~RocksWriteBatch() = default; - - nebula::cpp2::ErrorCode put(folly::StringPiece key, folly::StringPiece value) override { - if (batch_.Put(toSlice(key), toSlice(value)).ok()) { - return nebula::cpp2::ErrorCode::SUCCEEDED; - } else { - return nebula::cpp2::ErrorCode::E_UNKNOWN; - } - } - - nebula::cpp2::ErrorCode remove(folly::StringPiece key) override { - if (batch_.Delete(toSlice(key)).ok()) { - return nebula::cpp2::ErrorCode::SUCCEEDED; - } else { - return nebula::cpp2::ErrorCode::E_UNKNOWN; - } - } - - // Remove all keys in the range [start, end) - nebula::cpp2::ErrorCode removeRange(folly::StringPiece start, folly::StringPiece end) override { - if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) { - return nebula::cpp2::ErrorCode::SUCCEEDED; - } else { - return nebula::cpp2::ErrorCode::E_UNKNOWN; - } - } - - rocksdb::WriteBatch* data() { - return &batch_; - } -}; - -} // Anonymous namespace - /*************************************** * - * Implementation of WriteBatch + * Implementation of RocksEngine * **************************************/ RocksEngine::RocksEngine(GraphSpaceID spaceId, diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index c14e53df10c..7384ed218fc 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -122,6 +122,49 @@ class RocksCommonIter : public KVIterator { std::unique_ptr iter_; }; +/*************************************** + * + * Implementation of WriteBatch + * + **************************************/ +class RocksWriteBatch : public WriteBatch { + private: + rocksdb::WriteBatch batch_; + + public: + RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {} + + virtual ~RocksWriteBatch() = default; + + nebula::cpp2::ErrorCode put(folly::StringPiece key, folly::StringPiece value) override { + if (batch_.Put(toSlice(key), toSlice(value)).ok()) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } else { + return nebula::cpp2::ErrorCode::E_UNKNOWN; + } + } + + nebula::cpp2::ErrorCode remove(folly::StringPiece key) override { + if (batch_.Delete(toSlice(key)).ok()) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } else { + return nebula::cpp2::ErrorCode::E_UNKNOWN; + } + } + + // Remove all keys in the range [start, end) + nebula::cpp2::ErrorCode removeRange(folly::StringPiece start, folly::StringPiece end) override { + if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } else { + return nebula::cpp2::ErrorCode::E_UNKNOWN; + } + } + + rocksdb::WriteBatch* data() { + return &batch_; + } +}; /** * @brief An implementation of KVEngine based on Rocksdb * diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 49382316311..2b7230848d0 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -94,6 +94,19 @@ ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, const std::string& hostn auto addr = MetaKeyUtils::parseHostKey(iter->key()); HostInfo info = HostInfo::decode(iter->val()); + // skip the service not alive + int64_t expiredTime = + FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph + if (info.role_ == cpp2::HostRole::AGENT) { + expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor; + } + int64_t threshold = expiredTime * 1000; + auto now = time::WallClock::fastNowInMilliSec(); + if (now - info.lastHBTimeInMilliSec_ >= threshold) { + iter->next(); + continue; + } + if (addr.host == hostname) { hosts.emplace_back(addr, info.role_); } diff --git a/src/meta/processors/admin/AgentHBProcessor.cpp b/src/meta/processors/admin/AgentHBProcessor.cpp index 607f98278ab..861bec68d27 100644 --- a/src/meta/processors/admin/AgentHBProcessor.cpp +++ b/src/meta/processors/admin/AgentHBProcessor.cpp @@ -74,15 +74,18 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { // join the service host info and dir info auto services = std::move(nebula::value(servicesRet)); + size_t agentCnt = 0; std::vector serviceList; for (const auto& [addr, role] : services) { if (addr == agentAddr) { // skip iteself + agentCnt++; continue; } if (role == cpp2::HostRole::AGENT) { LOG(INFO) << folly::sformat("there is another agent: {} in the host", addr.toString()); + agentCnt++; continue; } @@ -98,12 +101,12 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { serviceInfo.role_ref() = role; serviceList.emplace_back(serviceInfo); } - if (serviceList.size() != services.size() - 1) { + if (serviceList.size() != services.size() - agentCnt) { ret = nebula::cpp2::ErrorCode::E_AGENT_HB_FAILUE; // missing some services' dir info LOG(INFO) << folly::sformat( "Missing some services's dir info, excepted service {}, but only got {}", - services.size() - 1, + services.size() - agentCnt, serviceList.size()); break; } diff --git a/src/meta/processors/admin/CreateBackupProcessor.cpp b/src/meta/processors/admin/CreateBackupProcessor.cpp index 2597113eda4..80e65f9ee66 100644 --- a/src/meta/processors/admin/CreateBackupProcessor.cpp +++ b/src/meta/processors/admin/CreateBackupProcessor.cpp @@ -147,7 +147,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { handleErrorCode(ret); ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Cancel write blocking error"; + LOG(INFO) << "Cancel write blocking error:" << apache::thrift::util::enumNameSafe(ret); } onFinished(); return; @@ -161,7 +161,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { handleErrorCode(nebula::error(sret)); ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Cancel write blocking error"; + LOG(INFO) << "Cancel write blocking error:" << apache::thrift::util::enumNameSafe(ret); } onFinished(); return; @@ -172,6 +172,10 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { if (!nebula::ok(backupFiles)) { LOG(INFO) << "Failed backup meta"; handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_FAILED); + ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Cancel write blocking error:" << apache::thrift::util::enumNameSafe(ret); + } onFinished(); return; } diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.cpp b/src/meta/processors/admin/ListClusterInfoProcessor.cpp index 63508d7b476..abfc1fdba0a 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.cpp +++ b/src/meta/processors/admin/ListClusterInfoProcessor.cpp @@ -5,6 +5,10 @@ #include "meta/processors/admin/ListClusterInfoProcessor.h" +DECLARE_int32(heartbeat_interval_secs); +DECLARE_int32(agent_heartbeat_interval_secs); +DECLARE_uint32(expired_time_factor); + namespace nebula { namespace meta { @@ -40,6 +44,18 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq&) { auto addr = MetaKeyUtils::parseHostKey(iter->key()); auto info = HostInfo::decode(iter->val()); + if (addr.host == "") { + LOG(INFO) << folly::sformat("Bad format host:{}, skip it", addr.toString()); + continue; + } + + if (!isAlive(info)) { + LOG(INFO) << folly::sformat("{}:{} is not alive, will skip it", + apache::thrift::util::enumNameSafe(info.role_), + addr.toString()); + continue; + } + cpp2::ServiceInfo service; service.role_ref() = info.role_; service.addr_ref() = addr; @@ -89,16 +105,27 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq&) { hostServices[metaAddr.host].push_back(service); } - // check: there should be only one agent in each host - for (const auto& [host, services] : hostServices) { + // Check there is at least one agent in each host. If there are many ones, we will pick the first. + std::unordered_map> agentServices; + for (auto& [host, services] : hostServices) { int agentCount = 0; - for (auto& s : services) { - if (s.get_role() == cpp2::HostRole::AGENT) { + for (auto it = services.begin(); it != services.end();) { + if (it->get_role() == cpp2::HostRole::AGENT) { agentCount++; + + if (agentCount > 1) { + LOG(INFO) << folly::sformat( + "Will erase redundant agent {} from host {}", it->get_addr().toString(), host); + it = services.erase(it); + continue; + } } + + it++; } + if (agentCount < 1) { - LOG(INFO) << folly::sformat("There are {} agent count is host {}", agentCount, host); + LOG(INFO) << folly::sformat("There is no agent in host {}", host); handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE); onFinished(); return; @@ -114,5 +141,18 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq&) { resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; onFinished(); } + +bool ListClusterInfoProcessor::isAlive(const HostInfo& info) { + int64_t expiredTime = + FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph + if (info.role_ == cpp2::HostRole::AGENT) { // agent + expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor; + } + int64_t threshold = expiredTime * 1000; + auto now = time::WallClock::fastNowInMilliSec(); + + return now - info.lastHBTimeInMilliSec_ < threshold; +} + } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.h b/src/meta/processors/admin/ListClusterInfoProcessor.h index 9633e7df696..03430b2aa9c 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.h +++ b/src/meta/processors/admin/ListClusterInfoProcessor.h @@ -27,6 +27,8 @@ class ListClusterInfoProcessor : public BaseProcessor private: explicit ListClusterInfoProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} + + bool isAlive(const HostInfo& info); }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/RestoreProcessor.cpp b/src/meta/processors/admin/RestoreProcessor.cpp index 311d3996f42..4e8fd2c8229 100644 --- a/src/meta/processors/admin/RestoreProcessor.cpp +++ b/src/meta/processors/admin/RestoreProcessor.cpp @@ -10,13 +10,12 @@ namespace nebula { namespace meta { -nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& ipv4From, - const HostAddr& ipv4To, - bool direct) { +nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition( + kvstore::WriteBatch* batch, std::map& hostMap) { folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; const auto& spacePrefix = MetaKeyUtils::spacePrefix(); - auto iterRet = doPrefix(spacePrefix, direct); + auto iterRet = doPrefix(spacePrefix, true); if (!nebula::ok(iterRet)) { retCode = nebula::error(iterRet); LOG(INFO) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); @@ -32,11 +31,9 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& } LOG(INFO) << "allSpaceId.size()=" << allSpaceId.size(); - std::vector data; - for (const auto& spaceId : allSpaceId) { const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); - auto iterPartRet = doPrefix(partPrefix, direct); + auto iterPartRet = doPrefix(partPrefix, true); if (!nebula::ok(iterPartRet)) { retCode = nebula::error(iterPartRet); LOG(INFO) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); @@ -48,85 +45,75 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& bool needUpdate = false; auto partHosts = MetaKeyUtils::parsePartVal(iter->val()); for (auto& host : partHosts) { - if (host == ipv4From) { + if (hostMap.find(host) != hostMap.end()) { + host = hostMap[host]; needUpdate = true; - host = ipv4To; } } if (needUpdate) { - data.emplace_back(iter->key(), MetaKeyUtils::partVal(partHosts)); + batch->put(iter->key(), MetaKeyUtils::partVal(partHosts)); } iter->next(); } } - if (direct) { - retCode = kvstore_->multiPutWithoutReplicator(kDefaultSpaceId, std::move(data)); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "multiPutWithoutReplicator partition info failed, error: " - << apache::thrift::util::enumNameSafe(retCode); - } - return retCode; - } - - folly::Baton baton; - kvstore_->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - retCode = code; - baton.post(); - }); - baton.wait(); - return retCode; } -nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4From, - const HostAddr& ipv4To, - bool direct) { +nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(kvstore::WriteBatch* batch, + std::map& hostMap) { folly::SharedMutex::WriteHolder holder(LockUtils::lock()); auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; const auto& zonePrefix = MetaKeyUtils::zonePrefix(); - auto iterRet = doPrefix(zonePrefix, direct); + auto iterRet = doPrefix(zonePrefix, true); if (!nebula::ok(iterRet)) { retCode = nebula::error(iterRet); LOG(INFO) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } auto iter = nebula::value(iterRet).get(); - std::vector data; while (iter->valid()) { bool needUpdate = false; auto zoneName = MetaKeyUtils::parseZoneName(iter->key()); auto hosts = MetaKeyUtils::parseZoneHosts(iter->val()); for (auto& host : hosts) { - if (host == ipv4From) { + if (hostMap.find(host) != hostMap.end()) { + host = hostMap[host]; needUpdate = true; - host = ipv4To; } } if (needUpdate) { - data.emplace_back(iter->key(), MetaKeyUtils::zoneVal(hosts)); + batch->put(iter->key(), MetaKeyUtils::zoneVal(hosts)); } iter->next(); } - if (direct) { - retCode = kvstore_->multiPutWithoutReplicator(kDefaultSpaceId, std::move(data)); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "multiPutWithoutReplicator zon info failed, error: " - << apache::thrift::util::enumNameSafe(retCode); - } + return retCode; +} + +nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInMachine( + kvstore::WriteBatch* batch, std::map& hostMap) { + folly::SharedMutex::WriteHolder holder(LockUtils::lock()); + auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; + + const auto& machinePrefix = MetaKeyUtils::machinePrefix(); + auto iterRet = doPrefix(machinePrefix, true); + if (!nebula::ok(iterRet)) { + retCode = nebula::error(iterRet); + LOG(INFO) << "Machine prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } + auto iter = nebula::value(iterRet).get(); - folly::Baton baton; - kvstore_->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - retCode = code; - baton.post(); - }); - baton.wait(); + while (iter->valid()) { + auto machine = MetaKeyUtils::parseMachineKey(iter->key()); + if (hostMap.find(machine) != hostMap.end()) { + batch->remove(MetaKeyUtils::machineKey(machine.host, machine.port)); + batch->put(MetaKeyUtils::machineKey(hostMap[machine].host, hostMap[machine].port), ""); + } + iter->next(); + } return retCode; } @@ -148,14 +135,20 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { return; } + auto batch = kvstore_->startBatchWrite(); auto replaceHosts = req.get_hosts(); if (!replaceHosts.empty()) { + std::map hostMap; for (auto h : replaceHosts) { if (h.get_from_host() == h.get_to_host()) { continue; } - auto result = replaceHostInPartition(h.get_from_host(), h.get_to_host(), true); + hostMap[h.get_from_host()] = h.get_to_host(); + } + + if (!hostMap.empty()) { + auto result = replaceHostInPartition(batch.get(), hostMap); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "replaceHost in partition fails when recovered"; handleErrorCode(result); @@ -163,7 +156,7 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { return; } - result = replaceHostInZone(h.get_from_host(), h.get_to_host(), true); + result = replaceHostInZone(batch.get(), hostMap); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(INFO) << "replaceHost in zone fails when recovered"; handleErrorCode(result); @@ -171,7 +164,13 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { return; } - // TODO(spw): need to replace the registered machine + result = replaceHostInMachine(batch.get(), hostMap); + if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "replaceHost in machine fails when recovered"; + handleErrorCode(result); + onFinished(); + return; + } } } @@ -179,7 +178,8 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { unlink(f.c_str()); } - handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED); + ret = kvstore_->batchWriteWithoutReplicator(kDefaultSpaceId, std::move(batch)); + handleErrorCode(ret); onFinished(); } diff --git a/src/meta/processors/admin/RestoreProcessor.h b/src/meta/processors/admin/RestoreProcessor.h index 5b1091a237e..af136ab9766 100644 --- a/src/meta/processors/admin/RestoreProcessor.h +++ b/src/meta/processors/admin/RestoreProcessor.h @@ -6,15 +6,28 @@ #ifndef META_RESTOREPROCESSOR_H_ #define META_RESTOREPROCESSOR_H_ +#include "kvstore/KVEngine.h" #include "meta/processors/BaseProcessor.h" namespace nebula { namespace meta { /** - * @brief Rebuild the host relative info after ingesting the table backup data to + * @brief Rebuild the host related meta info after ingesting the table backup data to * the new cluster metad KV store. * + * For example, we have an cluster A which consists of hosts + * [ip1:port1, ip2:port2, ip3:port3]. Now we use backup data from cluster A to restore in a + * new cluster B which consists of hosts [ip4:port4, ip5:port5, ip6:port6]. + * Because after ingesting, the meta info has data consists of old addresses, + * we should replace the related info with [from, to] pairs: + * ip1:port1->ip4:port4, ip2:port2->ip5:port5, ip3:port3->ip6:port6. + * + * Now the related meta info including: + * 1. part info: graph + part -> part peers list + * 2. zone info: zone id -> zone hosts list + * 3. machine info: machine key -> "" + * */ class RestoreProcessor : public BaseProcessor { public: @@ -26,15 +39,14 @@ class RestoreProcessor : public BaseProcessor { private: explicit RestoreProcessor(kvstore::KVStore* kvstore) : BaseProcessor(kvstore) {} - // A direct value of true means that data will not be written to follow via - // the raft protocol, but will be written directly to local disk - nebula::cpp2::ErrorCode replaceHostInPartition(const HostAddr& ipv4From, - const HostAddr& ipv4To, - bool direct = false); + nebula::cpp2::ErrorCode replaceHostInPartition(kvstore::WriteBatch* batch, + std::map& hostMap); + + nebula::cpp2::ErrorCode replaceHostInZone(kvstore::WriteBatch* batch, + std::map& hostMap); - nebula::cpp2::ErrorCode replaceHostInZone(const HostAddr& ipv4From, - const HostAddr& ipv4To, - bool direct = false); + nebula::cpp2::ErrorCode replaceHostInMachine(kvstore::WriteBatch* batch, + std::map& hostMap); }; } // namespace meta diff --git a/src/meta/test/RestoreProcessorTest.cpp b/src/meta/test/RestoreProcessorTest.cpp index 86b840855bc..30e971c401f 100644 --- a/src/meta/test/RestoreProcessorTest.cpp +++ b/src/meta/test/RestoreProcessorTest.cpp @@ -13,20 +13,19 @@ namespace nebula { namespace meta { +// backup given space TEST(RestoreProcessorTest, RestoreTest) { fs::TempDir rootPath("/tmp/RestoreOriginTest.XXXXXX"); std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); auto now = time::WallClock::fastNowInMilliSec(); + // initial three storaged HostAddr host1("127.0.0.1", 3360); HostAddr host2("127.0.0.2", 3360); HostAddr host3("127.0.0.3", 3360); + std::vector hosts{host1, host2, host3}; - std::vector hosts; - hosts.emplace_back(host1); - hosts.emplace_back(host2); - hosts.emplace_back(host3); - + // register them std::vector times; for (auto h : hosts) { ActiveHostsMan::updateHostInfo( @@ -35,8 +34,7 @@ TEST(RestoreProcessorTest, RestoreTest) { TestUtils::doPut(kv.get(), times); TestUtils::registerHB(kv.get(), hosts); - // mock admin client - bool ret = false; + // mock two spaces cpp2::SpaceDesc properties; GraphSpaceID id = 1; properties.space_name_ref() = "test_space"; @@ -60,24 +58,25 @@ TEST(RestoreProcessorTest, RestoreTest) { std::string(reinterpret_cast(&id2), sizeof(GraphSpaceID))); data.emplace_back(MetaKeyUtils::spaceKey(id2), MetaKeyUtils::spaceVal(properties2)); - std::unordered_map> partInfo; - + // mock part distribution for "test_space" + std::unordered_map> fromHostParts; for (auto partId = 1; partId <= partNum; partId++) { - std::vector hosts4; + std::vector partHosts; size_t idx = partId; for (int32_t i = 0; i < 3; i++, idx++) { auto h = hosts[idx % 3]; - hosts4.emplace_back(h); - partInfo[h].emplace_back(partId); + partHosts.emplace_back(h); + fromHostParts[h].emplace_back(partId); } - data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(hosts4)); + data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(partHosts)); } - std::string zoneName = "test_zone"; - std::vector zoneNames = {zoneName}; - + // mock an root user data.emplace_back(MetaKeyUtils::userKey("root"), MetaKeyUtils::userVal("root")); + // mock an zone with {host1, host2, host3} + std::string zoneName = "test_zone"; + std::vector zoneNames = {zoneName}; auto zoneId = 1; data.emplace_back(MetaKeyUtils::indexZoneKey(zoneName), std::string(reinterpret_cast(&zoneId), sizeof(ZoneID))); @@ -91,14 +90,9 @@ TEST(RestoreProcessorTest, RestoreTest) { data.emplace_back(MetaKeyUtils::lastUpdateTimeKey(), MetaKeyUtils::lastUpdateTimeVal(lastUpdateTime)); - folly::Baton baton; - kv->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); - baton.wait(); + TestUtils::doPut(kv.get(), data); + // mock an meta backup files, only backup specified space instead of full space std::unordered_set spaces = {id}; auto backupName = folly::sformat("BACKUP_{}", MetaKeyUtils::genTimestampStr()); auto spaceNames = std::make_unique>(); @@ -111,73 +105,67 @@ TEST(RestoreProcessorTest, RestoreTest) { auto it = std::find_if(files.cbegin(), files.cend(), [](auto& f) { auto const pos = f.find_last_of("/"); auto name = f.substr(pos + 1); - if (name == MetaKeyUtils::zonePrefix()) { + if (name == MetaKeyUtils::zonePrefix() + ".sst") { return true; } return false; }); - ASSERT_EQ(it, files.cend()); + ASSERT_EQ(it, files.cend()); // should not include system tables req.files_ref() = std::move(files); + + // mock an new cluster {host4, host5. host6}, and constuct restore pairs std::vector hostPairs; HostAddr host4("127.0.0.4", 3360); HostAddr host5("127.0.0.5", 3360); HostAddr host6("127.0.0.6", 3360); std::unordered_map hostMap = { {host1, host4}, {host2, host5}, {host3, host6}}; - - for (auto hm : hostMap) { - hostPairs.emplace_back(apache::thrift::FragileConstructor(), hm.first, hm.second); + for (auto [from, to] : hostMap) { + hostPairs.emplace_back(apache::thrift::FragileConstructor(), from, to); } - req.hosts_ref() = std::move(hostPairs); + + // mock an new cluster to restore fs::TempDir restoreTootPath("/tmp/RestoreTest.XXXXXX"); std::unique_ptr kvRestore(MockCluster::initMetaKV(restoreTootPath.path())); std::vector restoreData; restoreData.emplace_back(MetaKeyUtils::userKey("root"), MetaKeyUtils::userVal("password")); + TestUtils::doPut(kvRestore.get(), restoreData); - folly::Baton restoreBaton; - kvRestore->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(restoreData), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - restoreBaton.post(); - }); - restoreBaton.wait(); - + // call the restore processor auto* processor = RestoreProcessor::instance(kvRestore.get()); auto f = processor->getFuture(); processor->process(req); auto resp = std::move(f).get(); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); - nebula::cpp2::ErrorCode result; - std::unique_ptr iter; - + // verify host->parts info const auto& partPrefix = MetaKeyUtils::partPrefix(id); - result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); + std::unique_ptr iter; + auto result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result); - std::unordered_map> toPartInfo; - + std::unordered_map> toHostParts; while (iter->valid()) { auto key = iter->key(); auto partId = MetaKeyUtils::parsePartKeyPartId(key); auto partHosts = MetaKeyUtils::parsePartVal(iter->val()); for (auto& host : partHosts) { LOG(INFO) << "partHost: " << host.toString(); - toPartInfo[host].emplace_back(partId); + toHostParts[host].emplace_back(partId); } iter->next(); } - for (auto pi : partInfo) { - auto parts = toPartInfo[hostMap[pi.first]]; + for (auto pi : fromHostParts) { + auto parts = toHostParts[hostMap[pi.first]]; ASSERT_EQ(parts.size(), pi.second.size()); ASSERT_TRUE(std::equal(parts.cbegin(), parts.cend(), pi.second.cbegin())); } + // verify zone info, zone info has not been replaced with ingesting auto prefix = MetaKeyUtils::zonePrefix(); result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result); - std::vector zones; std::vector restoredHosts = {host4, host5, host6}; while (iter->valid()) { @@ -230,31 +218,26 @@ TEST(RestoreProcessorTest, RestoreTest) { } } +// backup full space TEST(RestoreProcessorTest, RestoreFullTest) { fs::TempDir rootPath("/tmp/RestoreFullOriginTest.XXXXXX"); std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); auto now = time::WallClock::fastNowInMilliSec(); + // mock three storaged and register them HostAddr host1("127.0.0.1", 3360); HostAddr host2("127.0.0.2", 3360); HostAddr host3("127.0.0.3", 3360); - - std::vector hosts; - hosts.emplace_back(host1); - hosts.emplace_back(host2); - hosts.emplace_back(host3); - + std::vector hosts{host1, host2, host3}; std::vector times; for (auto h : hosts) { ActiveHostsMan::updateHostInfo( kv.get(), h, HostInfo(now, meta::cpp2::HostRole::STORAGE, ""), times); } - TestUtils::doPut(kv.get(), times); TestUtils::registerHB(kv.get(), hosts); - // mock admin client - bool ret = false; + // mock two space with 10 partitions and 3 replicas for each cpp2::SpaceDesc properties; GraphSpaceID id = 1; properties.space_name_ref() = "test_space"; @@ -278,36 +261,51 @@ TEST(RestoreProcessorTest, RestoreFullTest) { std::string(reinterpret_cast(&id2), sizeof(GraphSpaceID))); data.emplace_back(MetaKeyUtils::spaceKey(id2), MetaKeyUtils::spaceVal(properties2)); - std::unordered_map> partInfo; - + std::unordered_map> fromHostParts; for (auto partId = 1; partId <= partNum; partId++) { - std::vector hosts4; + std::vector partHosts; size_t idx = partId; for (int32_t i = 0; i < 3; i++, idx++) { auto h = hosts[idx % 3]; - hosts4.emplace_back(h); - partInfo[h].emplace_back(partId); + partHosts.emplace_back(h); + fromHostParts[h].emplace_back(partId); } - data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(hosts4)); + data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(partHosts)); } + // mock an zone with {host1, host2, host3} std::string zoneName = "test_zone"; std::vector zoneNames = {zoneName}; data.emplace_back(MetaKeyUtils::userKey("root"), MetaKeyUtils::userVal("root")); - auto zoneId = 1; data.emplace_back(MetaKeyUtils::indexZoneKey(zoneName), std::string(reinterpret_cast(&zoneId), sizeof(ZoneID))); data.emplace_back(MetaKeyUtils::zoneKey(zoneName), MetaKeyUtils::zoneVal(hosts)); + TestUtils::doPut(kv.get(), data); + + // check part data + std::unique_ptr iter1; + const auto& partPrefix = MetaKeyUtils::partPrefix(id); + auto result = kv->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter1); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result); + while (iter1->valid()) { + auto key = iter1->key(); + auto partId = MetaKeyUtils::parsePartKeyPartId(key); + auto partHosts = MetaKeyUtils::parsePartVal(iter1->val()); + + std::set sortedHosts; + for (const auto& host : partHosts) { + sortedHosts.insert(host); + } + int idx = 0; + for (const auto& host : sortedHosts) { + LOG(INFO) << "partId:" << partId << ", host: " << host.toString(); + ASSERT_EQ(host, hosts[idx++]); + } + iter1->next(); + } - folly::Baton baton; - kv->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); - baton.wait(); - + // mock backuping full meta tables std::unordered_set spaces = {id}; auto backupName = folly::sformat("BACKUP_{}", MetaKeyUtils::genTimestampStr()); auto backupFiles = MetaServiceUtils::backupTables(kv.get(), spaces, backupName, nullptr); @@ -318,12 +316,12 @@ TEST(RestoreProcessorTest, RestoreFullTest) { auto it = std::find_if(files.cbegin(), files.cend(), [](auto& f) { auto const pos = f.find_last_of("/"); auto name = f.substr(pos + 1); - if (name == MetaKeyUtils::zonePrefix()) { + if (name == MetaKeyUtils::zonePrefix() + ".sst") { return true; } return false; }); - ASSERT_EQ(it, files.cend()); + ASSERT_NE(it, files.cend()); req.files_ref() = std::move(files); std::vector hostPairs; HostAddr host4("127.0.0.4", 3360); @@ -331,9 +329,8 @@ TEST(RestoreProcessorTest, RestoreFullTest) { HostAddr host6("127.0.0.6", 3360); std::unordered_map hostMap = { {host1, host4}, {host2, host5}, {host3, host6}}; - - for (auto hm : hostMap) { - hostPairs.emplace_back(apache::thrift::FragileConstructor(), hm.first, hm.second); + for (auto [from, to] : hostMap) { + hostPairs.emplace_back(apache::thrift::FragileConstructor(), from, to); } req.hosts_ref() = std::move(hostPairs); @@ -341,13 +338,7 @@ TEST(RestoreProcessorTest, RestoreFullTest) { std::unique_ptr kvRestore(MockCluster::initMetaKV(restoreTootPath.path())); std::vector restoreData; restoreData.emplace_back(MetaKeyUtils::userKey("root"), MetaKeyUtils::userVal("password")); - - folly::Baton restoreBaton; - kvRestore->asyncMultiPut(0, 0, std::move(restoreData), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - restoreBaton.post(); - }); - restoreBaton.wait(); + TestUtils::doPut(kvRestore.get(), restoreData); auto* processor = RestoreProcessor::instance(kvRestore.get()); auto f = processor->getFuture(); @@ -355,35 +346,31 @@ TEST(RestoreProcessorTest, RestoreFullTest) { auto resp = std::move(f).get(); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); - nebula::cpp2::ErrorCode result; + // verify part distribution std::unique_ptr iter; - - const auto& partPrefix = MetaKeyUtils::partPrefix(id); result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, partPrefix, &iter); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result); - - std::unordered_map> toPartInfo; - + std::unordered_map> toHostParts; while (iter->valid()) { auto key = iter->key(); auto partId = MetaKeyUtils::parsePartKeyPartId(key); auto partHosts = MetaKeyUtils::parsePartVal(iter->val()); for (auto& host : partHosts) { - LOG(INFO) << "partHost: " << host.toString(); - toPartInfo[host].emplace_back(partId); + LOG(INFO) << "partId:" << partId << ", host: " << host.toString(); + toHostParts[host].emplace_back(partId); } iter->next(); } - for (auto pi : partInfo) { - auto parts = toPartInfo[hostMap[pi.first]]; - ASSERT_EQ(parts.size(), pi.second.size()); - ASSERT_TRUE(std::equal(parts.cbegin(), parts.cend(), pi.second.cbegin())); + for (auto [host, fromParts] : fromHostParts) { + auto toParts = toHostParts[hostMap[host]]; + ASSERT_EQ(fromParts.size(), toParts.size()); + ASSERT_TRUE(std::equal(fromParts.cbegin(), fromParts.cend(), toParts.cbegin())); } + // verify zone hosts auto prefix = MetaKeyUtils::zonePrefix(); result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result); - std::vector zones; std::vector restoredHosts = {host4, host5, host6}; while (iter->valid()) { diff --git a/src/storage/test/IndexTestUtil.h b/src/storage/test/IndexTestUtil.h index 6196c771092..87d2dbec3ef 100644 --- a/src/storage/test/IndexTestUtil.h +++ b/src/storage/test/IndexTestUtil.h @@ -301,8 +301,12 @@ class MockKVStore : public ::nebula::kvstore::KVStore { LOG(FATAL) << "Unexpect"; return ::nebula::cpp2::ErrorCode::SUCCEEDED; } - nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID, - std::vector<::nebula::kvstore::KV>) override { + std::unique_ptr startBatchWrite() override { + LOG(FATAL) << "Unexpect"; + return nullptr; + } + nebula::cpp2::ErrorCode batchWriteWithoutReplicator( + GraphSpaceID, std::unique_ptr) override { LOG(FATAL) << "Unexpect"; return ::nebula::cpp2::ErrorCode::SUCCEEDED; }