Skip to content

Commit

Permalink
fix meta upgrade for multi instance (#3734) (#3791)
Browse files Browse the repository at this point in the history
Conflicts:
	src/meta/MetaVersionMan.cpp
  • Loading branch information
darionyaphet authored Jan 24, 2022
1 parent 2fd67bc commit e7863df
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 113 deletions.
20 changes: 13 additions & 7 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return nullptr;
}

auto engineRet = kvstore->part(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(engineRet)) {
LOG(ERROR) << "Get nebula store engine failed";
return nullptr;
}

auto engine = nebula::value(engineRet)->engine();
LOG(INFO) << "Waiting for the leader elected...";
nebula::HostAddr leader;
while (true) {
Expand Down Expand Up @@ -138,22 +145,21 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V1) {
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(engine);
if (!ret.ok()) {
LOG(ERROR) << ret;
LOG(ERROR) << "Update meta from V1 to V2 failed " << ret;
return nullptr;
}

nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V2);
nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V2);
} else if (version == nebula::meta::MetaVersion::V2) {
LOG(INFO) << "version 3";
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(kvstore.get());
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(engine);
if (!ret.ok()) {
LOG(ERROR) << ret;
LOG(ERROR) << "Update meta from V2 to V3 failed " << ret;
return nullptr;
}

nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V3);
nebula::meta::MetaVersionMan::setMetaVersionToKV(engine, nebula::meta::MetaVersion::V3);
}

LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId;
Expand Down
114 changes: 61 additions & 53 deletions src/meta/MetaVersionMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "meta/MetaVersionMan.h"

#include "common/fs/FileUtils.h"
#include "meta/ActiveHostsMan.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobUtils.h"
Expand Down Expand Up @@ -51,78 +52,86 @@ MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) {
}

// static
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(kMetaVersionKey,
std::string(reinterpret_cast<const char*>(&version), sizeof(MetaVersion)));
bool ret = true;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Put failed, error: " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Write meta version 3 succeeds";
}
baton.post();
});
baton.wait();
return ret;
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version) {
CHECK_NOTNULL(engine);
std::string versionValue =
std::string(reinterpret_cast<const char*>(&version), sizeof(MetaVersion));
auto code = engine->put(kMetaVersionKey, std::move(versionValue));
return code == nebula::cpp2::ErrorCode::SUCCEEDED;
}

// static
Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVEngine* engine) {
CHECK_NOTNULL(engine);
auto snapshot = folly::sformat("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {

std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) {
LOG(ERROR) << "Make checkpoint dir: " << path << " failed";
return Status::Error("Create snapshot file failed");
}

std::string dataPath = folly::sformat("{}/data", path);
auto code = engine->createCheckpoint(dataPath);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV1ToV2(kv);

auto status = doUpgradeV1ToV2(engine);
if (!status.ok()) {
// rollback by snapshot
return status;
}

// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);

if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVEngine* engine) {
CHECK_NOTNULL(engine);
auto snapshot = folly::format("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()).str();
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {

std::string path = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (!fs::FileUtils::exist(path) && !fs::FileUtils::makeDir(path)) {
LOG(ERROR) << "Make checkpoint dir: " << path << " failed";
return Status::Error("Create snapshot file failed");
}

std::string dataPath = folly::sformat("{}/data", path);
auto code = engine->createCheckpoint(dataPath);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV2ToV3(kv);

auto status = doUpgradeV2ToV3(engine);
if (!status.ok()) {
// rollback by snapshot
return status;
}

// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
auto checkpointPath = folly::sformat("{}/checkpoints/{}", engine->getDataRoot(), snapshot);
if (fs::FileUtils::exist(checkpointPath) && !fs::FileUtils::remove(checkpointPath.data(), true)) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

// static
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVEngine* engine) {
MetaDataUpgrade upgrader(engine);
{
// kSpacesTable
auto prefix = nebula::meta::v1::kSpacesTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -143,7 +152,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kPartsTable
auto prefix = nebula::meta::v1::kPartsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -164,7 +173,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kHostsTable
auto prefix = nebula::meta::v1::kHostsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -185,7 +194,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kLeadersTable
auto prefix = nebula::meta::v1::kLeadersTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -206,7 +215,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kTagsTable
auto prefix = nebula::meta::v1::kTagsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -227,7 +236,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kEdgesTable
auto prefix = nebula::meta::v1::kEdgesTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -248,7 +257,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kIndexesTable
auto prefix = nebula::meta::v1::kIndexesTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -269,7 +278,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kConfigsTable
auto prefix = nebula::meta::v1::kConfigsTable;
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -290,7 +299,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
// kJob
auto prefix = JobUtil::jobPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand Down Expand Up @@ -324,7 +333,7 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
nebula::meta::v1::kJobArchive});
std::unique_ptr<kvstore::KVIterator> iter;
for (auto& prefix : prefixes) {
auto ret = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto ret = engine->prefix(prefix, &iter);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
Status status = Status::OK();
while (iter->valid()) {
Expand All @@ -351,22 +360,22 @@ Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
}
}
}
if (!setMetaVersionToKV(kv, MetaVersion::V2)) {
if (!setMetaVersionToKV(engine, MetaVersion::V2)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
}
}

Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVEngine* engine) {
MetaDataUpgrade upgrader(engine);
// Step 1: Upgrade HeartBeat into machine list
{
// collect all hosts association with zone
std::vector<HostAddr> zoneHosts;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> zoneIter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &zoneIter);
auto code = engine->prefix(zonePrefix, &zoneIter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
Expand All @@ -382,7 +391,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {

const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
code = engine->prefix(prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
Expand Down Expand Up @@ -420,7 +429,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
{
const auto& prefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
auto code = engine->prefix(prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get spaces failed";
return Status::Error("Get spaces failed");
Expand All @@ -430,7 +439,6 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
if (FLAGS_print_info) {
upgrader.printSpacesV2(iter->val());
}
auto spaceProperties = meta::v2::MetaServiceUtilsV2::parseSpace(iter->val());
auto status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val());
if (!status.ok()) {
LOG(ERROR) << status;
Expand All @@ -439,7 +447,7 @@ Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
iter->next();
}
}
if (!setMetaVersionToKV(kv, MetaVersion::V3)) {
if (!setMetaVersionToKV(engine, MetaVersion::V3)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
Expand Down
11 changes: 6 additions & 5 deletions src/meta/MetaVersionMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "common/base/Base.h"
#include "common/utils/MetaKeyUtils.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVStore.h"

namespace nebula {
Expand All @@ -29,18 +30,18 @@ class MetaVersionMan final {

static MetaVersion getMetaVersionFromKV(kvstore::KVStore* kv);

static bool setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version);
static bool setMetaVersionToKV(kvstore::KVEngine* engine, MetaVersion version);

static Status updateMetaV1ToV2(kvstore::KVStore* kv);
static Status updateMetaV1ToV2(kvstore::KVEngine* engine);

static Status updateMetaV2ToV3(kvstore::KVStore* kv);
static Status updateMetaV2ToV3(kvstore::KVEngine* engine);

private:
static MetaVersion getVersionByHost(kvstore::KVStore* kv);

static Status doUpgradeV1ToV2(kvstore::KVStore* kv);
static Status doUpgradeV1ToV2(kvstore::KVEngine* engine);

static Status doUpgradeV2ToV3(kvstore::KVStore* kv);
static Status doUpgradeV2ToV3(kvstore::KVEngine* engine);
};

} // namespace meta
Expand Down
4 changes: 2 additions & 2 deletions src/meta/upgrade/MetaDataUpgrade.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status MetaDataUpgrade::rewriteSpacesV2ToV3(const folly::StringPiece &key,
auto groupName = *oldProps.group_name_ref();
auto groupKey = meta::v2::MetaServiceUtilsV2::groupKey(groupName);
std::string zoneValue;
auto code = kv_->get(kDefaultSpaceId, kDefaultPartId, std::move(groupKey), &zoneValue);
auto code = engine_->get(std::move(groupKey), &zoneValue);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Get Group Failed");
}
Expand All @@ -107,7 +107,7 @@ Status MetaDataUpgrade::rewriteSpacesV2ToV3(const folly::StringPiece &key,
} else {
const auto &zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv_->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter);
auto code = engine_->prefix(zonePrefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Get Zones Failed");
}
Expand Down
Loading

0 comments on commit e7863df

Please sign in to comment.