Skip to content

Commit

Permalink
Meta upgrader v3 (#3540)
Browse files Browse the repository at this point in the history
* Replace group when create space

* Support white list

* fix test case

* support zone operations

* fix

* Support meta upgrade v3
  • Loading branch information
darionyaphet authored Jan 5, 2022
1 parent f51fbbd commit 1147e89
Show file tree
Hide file tree
Showing 31 changed files with 507 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ class MetaClient {
std::atomic<int64_t> metadLastUpdateTime_{0};

int64_t metaServerVersion_{-1};
static constexpr int64_t EXPECT_META_VERSION = 2;
static constexpr int64_t EXPECT_META_VERSION = 3;

// leadersLock_ is used to protect leadersInfo
folly::RWSpinLock leadersLock_;
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ FunctionManager::FunctionManager() {
attr.maxArity_ = 1;
attr.isPure_ = false;
attr.body_ = [](const auto &args) -> Value {
if (args.size() == 0) {
if (args.empty()) {
return time::WallClock::fastNowInSec();
}
auto status = time::TimeUtils::toTimestamp(args[0].get());
Expand Down
1 change: 1 addition & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_http_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
${common_deps}
${storage_meta_deps}
Expand Down
32 changes: 14 additions & 18 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,22 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
LOG(ERROR) << "Meta version is invalid";
return nullptr;
} else if (version == nebula::meta::MetaVersion::V1) {
if (leader == localhost) {
LOG(INFO) << "I am leader, begin upgrade meta data";
// need to upgrade the v1.0 meta data format to v2.0 meta data format
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
} else {
LOG(INFO) << "I am follower, wait for leader to sync upgrade";
while (version != nebula::meta::MetaVersion::V2) {
VLOG(1) << "Waiting for leader to upgrade";
sleep(1);
version = nebula::meta::MetaVersionMan::getMetaVersionFromKV(kvstore.get());
}
auto ret = nebula::meta::MetaVersionMan::updateMetaV1ToV2(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}

nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V2);
} else if (version == nebula::meta::MetaVersion::V2) {
LOG(INFO) << "version 3";
auto ret = nebula::meta::MetaVersionMan::updateMetaV2ToV3(kvstore.get());
if (!ret.ok()) {
LOG(ERROR) << ret;
return nullptr;
}
}

if (leader == localhost) {
nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get());
nebula::meta::MetaVersionMan::setMetaVersionToKV(kvstore.get(), nebula::meta::MetaVersion::V3);
}

LOG(INFO) << "Nebula store init succeeded, clusterId " << gClusterId;
Expand Down
4 changes: 2 additions & 2 deletions src/graph/executor/query/LeftJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ folly::Future<Status> LeftJoinExecutor::join(const std::vector<Expression*>& has
DataSet result;
if (hashKeys.size() == 1 && probeKeys.size() == 1) {
std::unordered_map<Value, std::vector<const Row*>> hashTable;
hashTable.reserve(rhsIter_->size() == 0 ? 1 : rhsIter_->size());
hashTable.reserve(rhsIter_->empty() ? 1 : rhsIter_->size());
if (!lhsIter_->empty()) {
buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable);
result = singleKeyProbe(hashKeys.front(), lhsIter_.get(), hashTable);
}
} else {
std::unordered_map<List, std::vector<const Row*>> hashTable;
hashTable.reserve(rhsIter_->size() == 0 ? 1 : rhsIter_->size());
hashTable.reserve(rhsIter_->empty() ? 1 : rhsIter_->size());
if (!lhsIter_->empty()) {
buildHashTable(probeKeys, rhsIter_.get(), hashTable);
result = probe(hashKeys, lhsIter_.get(), hashTable);
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/match/ScanSeek.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ StatusOr<SubPlan> ScanSeek::transformEdge(EdgeContext *edgeCtx) {
bool ScanSeek::matchNode(NodeContext *nodeCtx) {
auto &node = *nodeCtx->info;
// only require the tag
if (node.tids.size() == 0) {
if (node.tids.empty()) {
// empty labels means all labels
const auto *qctx = nodeCtx->matchClauseCtx->qctx;
auto allLabels = qctx->schemaMng()->getAllTags(nodeCtx->matchClauseCtx->space.id);
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/plugins/hbase/HBaseClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ ResultCode HBaseClient::range(const std::string& tableName,
while (true) {
std::vector<TResult> tResultList;
client_->sync_getScannerRows(tResultList, scannerId, kScanRowNum);
if (tResultList.size() == 0) break;
if (tResultList.empty()) break;
for (auto& tResult : tResultList) {
std::vector<TColumnValue> tColumnValueList = tResult.columnValues;
if (tColumnValueList.size() > 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,14 @@ nebula_add_library(
add_dependencies(
meta_version_man_obj
meta_v1_thrift_obj
meta_v2_thrift_obj
)

set(meta_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v2_thrift_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
Expand Down
136 changes: 124 additions & 12 deletions src/meta/MetaVersionMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

#include "meta/MetaVersionMan.h"

#include "meta/ActiveHostsMan.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobUtils.h"
#include "meta/upgrade/MetaDataUpgrade.h"
#include "meta/upgrade/v1/MetaServiceUtilsV1.h"
#include "meta/upgrade/v2/MetaServiceUtilsV2.h"

DEFINE_bool(null_type, true, "set schema to support null type");
DEFINE_bool(print_info, false, "enable to print the rewrite data");
Expand All @@ -26,7 +28,7 @@ MetaVersion MetaVersionMan::getMetaVersionFromKV(kvstore::KVStore* kv) {
auto code = kv->get(kDefaultSpaceId, kDefaultPartId, kMetaVersionKey, &value, true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
auto version = *reinterpret_cast<const MetaVersion*>(value.data());
return (version == MetaVersion::V2) ? MetaVersion::V2 : MetaVersion::UNKNOWN;
return version;
} else {
return getVersionByHost(kv);
}
Expand All @@ -42,19 +44,18 @@ MetaVersion MetaVersionMan::getVersionByHost(kvstore::KVStore* kv) {
}
if (iter->valid()) {
auto v1KeySize = hostPrefix.size() + sizeof(int64_t);
return (iter->key().size() == v1KeySize) ? MetaVersion::V1 : MetaVersion::V2;
return (iter->key().size() == v1KeySize) ? MetaVersion::V1 : MetaVersion::V3;
}
// No hosts exists, regard as regard as version 2
return MetaVersion::V2;
// No hosts exists, regard as version 3
return MetaVersion::V3;
}

// static
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv) {
bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version) {
CHECK_NOTNULL(kv);
auto v2 = MetaVersion::V2;
std::vector<kvstore::KV> data;
data.emplace_back(kMetaVersionKey,
std::string(reinterpret_cast<const char*>(&v2), sizeof(MetaVersion)));
std::string(reinterpret_cast<const char*>(&version), sizeof(MetaVersion)));
bool ret = true;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(
Expand All @@ -63,7 +64,7 @@ bool MetaVersionMan::setMetaVersionToKV(kvstore::KVStore* kv) {
LOG(ERROR) << "Put failed, error: " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Write meta version 2 succeeds";
LOG(INFO) << "Write meta version 3 succeeds";
}
baton.post();
});
Expand All @@ -80,7 +81,28 @@ Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgrade(kv);
auto status = doUpgradeV1ToV2(kv);
if (!status.ok()) {
// rollback by snapshot
return status;
}
// delete snapshot file
auto dmRet = kv->dropCheckpoint(kDefaultSpaceId, snapshot);
if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Delete snapshot: " << snapshot << " failed, You need to delete it manually";
}
return Status::OK();
}

Status MetaVersionMan::updateMetaV2ToV3(kvstore::KVStore* kv) {
CHECK_NOTNULL(kv);
auto snapshot = folly::format("META_UPGRADE_SNAPSHOT_{}", MetaKeyUtils::genTimestampStr()).str();
auto meteRet = kv->createCheckpoint(kDefaultSpaceId, snapshot);
if (meteRet.isLeftType()) {
LOG(ERROR) << "Create snapshot failed: " << snapshot;
return Status::Error("Create snapshot failed");
}
auto status = doUpgradeV2ToV3(kv);
if (!status.ok()) {
// rollback by snapshot
return status;
Expand All @@ -94,7 +116,7 @@ Status MetaVersionMan::updateMetaV1ToV2(kvstore::KVStore* kv) {
}

// static
Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
Status MetaVersionMan::doUpgradeV1ToV2(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
{
// kSpacesTable
Expand All @@ -105,7 +127,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
Status status = Status::OK();
while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpaces(iter->val());
upgrader.printSpacesV1(iter->val());
}
status = upgrader.rewriteSpaces(iter->key(), iter->val());
if (!status.ok()) {
Expand Down Expand Up @@ -158,6 +180,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}

{
// kLeadersTable
auto prefix = nebula::meta::v1::kLeadersTable;
Expand All @@ -178,6 +201,7 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}

{
// kTagsTable
auto prefix = nebula::meta::v1::kTagsTable;
Expand Down Expand Up @@ -327,7 +351,95 @@ Status MetaVersionMan::doUpgrade(kvstore::KVStore* kv) {
}
}
}
if (!setMetaVersionToKV(kv)) {
if (!setMetaVersionToKV(kv, MetaVersion::V2)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
}
}

Status MetaVersionMan::doUpgradeV2ToV3(kvstore::KVStore* kv) {
MetaDataUpgrade upgrader(kv);
// Step 1: Upgrade HeartBeat into machine list
{
// collect all hosts association with zone
std::vector<HostAddr> zoneHosts;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> zoneIter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &zoneIter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
}

while (zoneIter->valid()) {
auto hosts = MetaKeyUtils::parseZoneHosts(zoneIter->val());
if (!hosts.empty()) {
zoneHosts.insert(zoneHosts.end(), hosts.begin(), hosts.end());
}
zoneIter->next();
}

const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get active hosts failed";
return Status::Error("Get hosts failed");
}

std::vector<kvstore::KV> data;
while (iter->valid()) {
auto info = HostInfo::decode(iter->val());

if (info.role_ == meta::cpp2::HostRole::STORAGE) {
// Save the machine information
auto host = MetaKeyUtils::parseHostKey(iter->key());
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
data.emplace_back(std::move(machineKey), "");

auto hostIt = std::find(zoneHosts.begin(), zoneHosts.end(), host);
if (hostIt == zoneHosts.end()) {
// Save the zone information
auto zoneName = folly::stringPrintf("default_zone_%s_%d", host.host.c_str(), host.port);
auto zoneKey = MetaKeyUtils::zoneKey(std::move(zoneName));
auto zoneVal = MetaKeyUtils::zoneVal({host});
data.emplace_back(std::move(zoneKey), std::move(zoneVal));
}
}
iter->next();
}
auto status = upgrader.saveMachineAndZone(std::move(data));
if (!status.ok()) {
LOG(ERROR) << status;
return status;
}
}

// Step 2: Update Create space properties about Group
{
const auto& prefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get spaces failed";
return Status::Error("Get spaces failed");
}

while (iter->valid()) {
if (FLAGS_print_info) {
upgrader.printSpacesV2(iter->val());
}
auto spaceProperties = meta::v2::MetaServiceUtilsV2::parseSpace(iter->val());
auto status = upgrader.rewriteSpacesV2ToV3(iter->key(), iter->val());
if (!status.ok()) {
LOG(ERROR) << status;
return status;
}
iter->next();
}
}
if (!setMetaVersionToKV(kv, MetaVersion::V3)) {
return Status::Error("Persist meta version failed");
} else {
return Status::OK();
Expand Down
11 changes: 8 additions & 3 deletions src/meta/MetaVersionMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum class MetaVersion {
UNKNOWN = 0,
V1 = 1,
V2 = 2,
V3 = 3,
};

/**
Expand All @@ -28,17 +29,21 @@ class MetaVersionMan final {

static MetaVersion getMetaVersionFromKV(kvstore::KVStore* kv);

static bool setMetaVersionToKV(kvstore::KVStore* kv);
static bool setMetaVersionToKV(kvstore::KVStore* kv, MetaVersion version);

static Status updateMetaV1ToV2(kvstore::KVStore* kv);

static Status updateMetaV2ToV3(kvstore::KVStore* kv);

private:
static MetaVersion getVersionByHost(kvstore::KVStore* kv);

static Status doUpgrade(kvstore::KVStore* kv);
static Status doUpgradeV1ToV2(kvstore::KVStore* kv);

static Status doUpgradeV2ToV3(kvstore::KVStore* kv);
};

} // namespace meta
} // namespace nebula

#endif // META_ROOTUSERMAN_H_
#endif // META_METAVERSIONMAN_H_
4 changes: 2 additions & 2 deletions src/meta/processors/admin/ListClusterInfoProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) {
}
auto iter = nebula::value(iterRet).get();
for (; iter->valid(); iter->next()) {
HostAddr addr = MetaKeyUtils::parseHostKey(iter->key());
HostInfo info = HostInfo::decode(iter->val());
auto addr = MetaKeyUtils::parseHostKey(iter->key());
auto info = HostInfo::decode(iter->val());

cpp2::ServiceInfo service;
service.role_ref() = info.role_;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/DataBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Status DataBalanceJobExecutor::buildBalancePlan() {
for (Host* h : hostVec) {
totalPartNum += h->parts_.size();
}
if (hostVec.size() == 0) {
if (hostVec.empty()) {
LOG(ERROR) << "rebalance error: zone has no host";
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::rebalanceActiveZones(
for (auto& z : sortedActiveZonesRef) {
totalPartNum += z->partNum_;
}
if (sortedActiveZonesRef.size() == 0) {
if (sortedActiveZonesRef.empty()) {
LOG(ERROR) << "rebalance error: no active zones";
return nebula::cpp2::ErrorCode::E_NO_HOSTS;
}
Expand Down
Loading

0 comments on commit 1147e89

Please sign in to comment.