diff --git a/src/daemons/MetaDaemonInit.cpp b/src/daemons/MetaDaemonInit.cpp index 09640d5dc58..4bfd28726e4 100644 --- a/src/daemons/MetaDaemonInit.cpp +++ b/src/daemons/MetaDaemonInit.cpp @@ -94,6 +94,13 @@ std::unique_ptr initKV(std::vector p return nullptr; } + auto engineRet = kvstore->part(nebula::kDefaultSpaceId, nebula::kDefaultPartId); + if (!nebula::ok(engineRet)) { + LOG(ERROR) << "Nebula store init failed"; + return nullptr; + } + + auto engine = nebula::value(engineRet)->engine(); LOG(INFO) << "Waiting for the leader elected..."; nebula::HostAddr leader; while (true) { @@ -138,22 +145,22 @@ std::unique_ptr initKV(std::vector p LOG(ERROR) << "Meta version is invalid"; return nullptr; } else if (version == nebula::meta::MetaVersion::V1) { - auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get()); + auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(engine); if (!ret.ok()) { LOG(ERROR) << ret; return nullptr; } - nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V2); + nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V2); } else if (version == nebula::meta::MetaVersion::V2) { LOG(INFO) << "version 3"; - auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(kvstore.get()); + auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(engine); if (!ret.ok()) { LOG(ERROR) << ret; return nullptr; } - nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V3); + nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V3); } LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId; diff --git a/src/meta/MetaVersionMan.cpp b/src/meta/MetaVersionMan.cpp index 292878899ad..094f2c7e711 100644 --- a/src/meta/MetaVersionMan.cpp +++ b/src/meta/MetaVersionMan.cpp @@ -5,6 +5,7 @@ #include "meta/MetaVersionMan.h" +#include "common/fs/FileUtils.h" #include "meta/ActiveHostsMan.h" #include "meta/processors/job/JobDescription.h" #include "meta/processors/job/JobUtils.h" @@ -51,78 +52,108 @@ MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) { } // static -bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version) { - CHECK_NOTNULL(kv); - std::vector data; - data.emplace_back(kMetaVersionKey, - std::string(reinterpret_cast(&version), sizeof(MetaVersion))); - bool ret = true; - folly::Baton baton; - kv->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Put failed, error: " << static_cast(code); - ret = false; - } else { - LOG(INFO) << "Write meta version 3 succeeds"; - } - baton.post(); - }); - baton.wait(); - return ret; +bool MetaVersionMan::setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version) { + CHECK_NOTNULL(engine); + // std::vector data; + // data.emplace_back(kMetaVersionKey, + // std::string(reinterpret_cast(&version), sizeof(MetaVersion))); + // bool ret = true; + // folly::Baton baton; + // kv->asyncMultiPut( + // kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { + // if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + // LOG(ERROR) << "Put failed, error: " << static_cast(code); + // ret = false; + // } else { + // LOG(INFO) << "Write meta version 3 succeeds"; + // } + // baton.post(); + // }); + // baton.wait(); + + std::string versionValue = + std::string(reinterpret_cast(&version), sizeof(MetaVersion)); + auto code = engine->put(kMetaVersionKey, std::move(versionValue)); + return code != nebula::cpp2::ErrorCode::SUCCEEDED; } // static -Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) { - CHECK_NOTNULL(kv); +Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVEngine* engine) { + CHECK_NOTNULL(engine); auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()); - auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot); - if (meteRet.isLeftType()) { + + std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot); + if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) { + LOG(ERROR) << "Make checkpoint dir: " << path << " failed"; + return Status::Error("Create snapshot file failed"); + } + + std::string dataPath = folly::sformat("{}/data", path); + auto code = engine->createCheckpoint(dataPath); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Create snapshot failed: " << snapshot; return Status::Error("Create snapshot failed"); } - auto status = doUpgradeV1ToV2(kv); + + auto status = doUpgradeV1ToV2(engine); if (!status.ok()) { // rollback by snapshot return status; } // delete snapshot file - auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot); - if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + // auto dmRet = engine->dropCheckpoint(kDefaultSpaceId, snapshot); + + auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot); + + if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) { LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually"; } return Status::OK(); } -Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVStore* kv) { - CHECK_NOTNULL(kv); +Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVEngine* engine) { + CHECK_NOTNULL(engine); auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()); - auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot); - if (meteRet.isLeftType()) { + + std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot); + if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) { + LOG(ERROR) << "Make checkpoint dir: " << path << " failed"; + return Status::Error("Create snapshot file failed"); + } + + std::string dataPath = folly::sformat("{}/data", path); + auto code = engine->createCheckpoint(dataPath); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Create snapshot failed: " << snapshot; return Status::Error("Create snapshot failed"); } - auto status = doUpgradeV2ToV3(kv); + + auto status = doUpgradeV2ToV3(engine); if (!status.ok()) { // rollback by snapshot return status; } // delete snapshot file - auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot); - if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot); + + if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) { LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually"; } + // auto dmRet = engine->dropCheckpoint(kDefaultSpaceId, snapshot); + // if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) { + // LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually"; + // } return Status::OK(); } // static -Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { - MetaDataUpgrade upgrader(kv); +Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVEngine* engine) { + MetaDataUpgrade upgrader(engine); { // kSpacesTable auto prefix = nebula::meta::v1::kSpacesTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -143,7 +174,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kPartsTable auto prefix = nebula::meta::v1::kPartsTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -164,7 +195,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kHostsTable auto prefix = nebula::meta::v1::kHostsTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -185,7 +216,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kLeadersTable auto prefix = nebula::meta::v1::kLeadersTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -206,7 +237,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kTagsTable auto prefix = nebula::meta::v1::kTagsTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -227,7 +258,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kEdgesTable auto prefix = nebula::meta::v1::kEdgesTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -248,7 +279,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kIndexesTable auto prefix = nebula::meta::v1::kIndexesTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -269,7 +300,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kConfigsTable auto prefix = nebula::meta::v1::kConfigsTable; std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -290,7 +321,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { // kJob auto prefix = JobUtil::jobPrefix(); std::unique_ptr iter; - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -324,7 +355,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { nebula::meta::v1::kJobArchive}); std::unique_ptr iter; for (auto& prefix : prefixes) { - auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto ret = engine->prefix(prefix, &iter); if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { Status status = Status::OK(); while (iter->valid()) { @@ -351,22 +382,22 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) { } } } - if (!setMetaVersionToKV(kv, MetaVersion::V2)) { + if (!setMetaVersionToKV(engine, MetaVersion::V2)) { return Status::Error("Persist meta version failed"); } else { return Status::OK(); } } -Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) { - MetaDataUpgrade upgrader(kv); +Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) { + MetaDataUpgrade upgrader(engine); // Step 1: Upgrade HeartBeat into machine list { // collect all hosts association with zone std::vector zoneHosts; const auto& zonePrefix = MetaKeyUtils::zonePrefix(); std::unique_ptr zoneIter; - auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &zoneIter); + auto code = engine->prefix(zonePrefix, &zoneIter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Get active hosts failed"; return Status::Error("Get hosts failed"); @@ -382,7 +413,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) { const auto& prefix = MetaKeyUtils::hostPrefix(); std::unique_ptr iter; - code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + code = engine->prefix(prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Get active hosts failed"; return Status::Error("Get hosts failed"); @@ -420,7 +451,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) { { const auto& prefix = MetaKeyUtils::spacePrefix(); std::unique_ptr iter; - auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + auto code = engine->prefix(prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Get spaces failed"; return Status::Error("Get spaces failed"); @@ -439,7 +470,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) { iter->next(); } } - if (!setMetaVersionToKV(kv, MetaVersion::V3)) { + if (!setMetaVersionToKV(engine, MetaVersion::V3)) { return Status::Error("Persist meta version failed"); } else { return Status::OK(); diff --git a/src/meta/MetaVersionMan.h b/src/meta/MetaVersionMan.h index c6e31b8d01d..07d7331f906 100644 --- a/src/meta/MetaVersionMan.h +++ b/src/meta/MetaVersionMan.h @@ -8,6 +8,7 @@ #include "common/base/Base.h" #include "common/utils/MetaKeyUtils.h" +#include "kvstore/KVEngine.h" #include "kvstore/KVStore.h" namespace nebula { @@ -29,18 +30,18 @@ class MetaVersionMan final { static MetaVersion getMetaVersionFromKV(kvstore::KVStore* kv); - static bool setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version); + static bool setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version); - static Status updateMetaV1ToV2(kvstore::KVStore* kv); + static Status updateMetaV1ToV2(kvstore::KVEngine* engine); - static Status updateMetaV2ToV3(kvstore::KVStore* kv); + static Status updateMetaV2ToV3(kvstore::KVEngine* engine); private: static MetaVersion getVersionByHost(kvstore::KVStore* kv); - static Status doUpgradeV1ToV2(kvstore::KVStore* kv); + static Status doUpgradeV1ToV2(kvstore::KVEngine* engine); - static Status doUpgradeV2ToV3(kvstore::KVStore* kv); + static Status doUpgradeV2ToV3(kvstore::KVEngine* engine); }; } // namespace meta diff --git a/src/meta/upgrade/MetaDataUpgrade.cpp b/src/meta/upgrade/MetaDataUpgrade.cpp index e057e8d6f85..e25e2ffb236 100644 --- a/src/meta/upgrade/MetaDataUpgrade.cpp +++ b/src/meta/upgrade/MetaDataUpgrade.cpp @@ -97,7 +97,7 @@ Status MetaDataUpgrade::rewriteSpacesV2ToV3(const folly::StringPiece &key, auto groupName = *oldProps.group_name_ref(); auto groupKey = meta::v2::MetaServiceUtilsV2::groupKey(groupName); std::string zoneValue; - auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, std::move(groupKey), &zoneValue); + auto code = engine_->get(std::move(groupKey), &zoneValue); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("Get Group Failed"); } @@ -107,7 +107,7 @@ Status MetaDataUpgrade::rewriteSpacesV2ToV3(const folly::StringPiece &key, } else { const auto &zonePrefix = MetaKeyUtils::zonePrefix(); std::unique_ptr iter; - auto code = kv_->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter); + auto code = engine_->prefix(zonePrefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("Get Zones Failed"); } diff --git a/src/meta/upgrade/MetaDataUpgrade.h b/src/meta/upgrade/MetaDataUpgrade.h index bd4ffea08a1..fe2bb143dfe 100644 --- a/src/meta/upgrade/MetaDataUpgrade.h +++ b/src/meta/upgrade/MetaDataUpgrade.h @@ -11,6 +11,7 @@ #include "common/base/Status.h" #include "common/utils/MetaKeyUtils.h" #include "interface/gen-cpp2/meta_types.h" +#include "kvstore/KVEngine.h" #include "kvstore/KVStore.h" #include "meta/processors/Common.h" #include "meta/upgrade/v1/gen-cpp2/meta_types.h" @@ -21,7 +22,7 @@ namespace meta { class MetaDataUpgrade final { public: - explicit MetaDataUpgrade(kvstore::KVStore *kv) : kv_(kv) {} + explicit MetaDataUpgrade(kvstore::KVEngine *engine) : engine_(engine) {} ~MetaDataUpgrade() = default; @@ -52,21 +53,22 @@ class MetaDataUpgrade final { private: Status put(const folly::StringPiece &key, const folly::StringPiece &val) { - std::vector data; - data.emplace_back(key.str(), val.str()); - folly::Baton baton; - auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; - kv_->asyncMultiPut(kDefaultSpaceId, - kDefaultPartId, - std::move(data), - [&ret, &baton](nebula::cpp2::ErrorCode code) { - if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { - ret = code; - LOG(INFO) << "Put data error on meta server"; - } - baton.post(); - }); - baton.wait(); + // std::vector data; + // data.emplace_back(key.str(), val.str()); + // folly::Baton baton; + // auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; + // kv_->asyncMultiPut(kDefaultSpaceId, + // kDefaultPartId, + // std::move(data), + // [&ret, &baton](nebula::cpp2::ErrorCode code) { + // if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { + // ret = code; + // LOG(INFO) << "Put data error on meta server"; + // } + // baton.post(); + // }); + // baton.wait(); + auto ret = engine_->put(key.str(), val.str()); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("Put data failed"); } @@ -74,19 +76,20 @@ class MetaDataUpgrade final { } Status put(std::vector data) { - folly::Baton baton; - auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; - kv_->asyncMultiPut(kDefaultSpaceId, - kDefaultPartId, - std::move(data), - [&ret, &baton](nebula::cpp2::ErrorCode code) { - if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { - ret = code; - LOG(INFO) << "Put data error on meta server"; - } - baton.post(); - }); - baton.wait(); + // folly::Baton baton; + // auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; + // kv_->asyncMultiPut(kDefaultSpaceId, + // kDefaultPartId, + // std::move(data), + // [&ret, &baton](nebula::cpp2::ErrorCode code) { + // if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { + // ret = code; + // LOG(INFO) << "Put data error on meta server"; + // } + // baton.post(); + // }); + // baton.wait(); + auto ret = engine_->multiPut(data); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("Put data failed"); } @@ -94,20 +97,21 @@ class MetaDataUpgrade final { } Status remove(const folly::StringPiece &key) { - std::vector keys{key.str()}; - folly::Baton baton; - auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; - kv_->asyncMultiRemove(kDefaultSpaceId, - kDefaultPartId, - std::move(keys), - [&ret, &baton](nebula::cpp2::ErrorCode code) { - if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { - ret = code; - LOG(INFO) << "Remove data error on meta server"; - } - baton.post(); - }); - baton.wait(); + // std::vector keys{key.str()}; + // folly::Baton baton; + // auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; + // kv_->asyncMultiRemove(kDefaultSpaceId, + // kDefaultPartId, + // std::move(keys), + // [&ret, &baton](nebula::cpp2::ErrorCode code) { + // if (nebula::cpp2::ErrorCode::SUCCEEDED != code) { + // ret = code; + // LOG(INFO) << "Remove data error on meta server"; + // } + // baton.post(); + // }); + // baton.wait(); + auto ret = engine_->remove(key.str()); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return Status::Error("Remove data failed"); } @@ -125,7 +129,8 @@ class MetaDataUpgrade final { nebula::meta::cpp2::GeoShape convertToGeoShape(nebula::meta::v2::cpp2::GeoShape shape); private: - kvstore::KVStore *kv_ = nullptr; + // kvstore::KVStore *kv_ = nullptr; + kvstore::KVEngine *engine_ = nullptr; }; } // namespace meta