Skip to content

Commit

Permalink
Cherry-pick PR from v2.5.0 (vesoft-inc#37)
Browse files Browse the repository at this point in the history
* get meta version in hb

* cherry-pick vesoft-inc#549 vesoft-inc#550 vesoft-inc#551

* damn timeout

* fix rebuild index bug introduced in vesoft-inc#2557

Co-authored-by: Doodle <[email protected]>
  • Loading branch information
nebula-bots and critical27 authored Sep 2, 2021
1 parent deec85b commit 4db27c1
Show file tree
Hide file tree
Showing 18 changed files with 94 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobs:
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 30
timeout-minutes: 60
- name: Down cluster
run: |
make down
Expand Down
12 changes: 9 additions & 3 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ MetaClient::~MetaClient() {

bool MetaClient::isMetadReady() {
auto ret = heartbeat().get();
if (!ret.ok() && ret.status() != Status::LeaderChanged()) {
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
ready_ = false;
return ready_;
} else if (options_.role_ == cpp2::HostRole::STORAGE &&
metaServerVersion_ != EXPECT_META_VERSION) {
LOG(ERROR) << "Expect meta version is " << EXPECT_META_VERSION << ", but actual is "
<< metaServerVersion_;
return ready_;
}

// ready_ will be set in loadData
bool ldRet = loadData();
bool lcRet = true;
if (!options_.skipConfig_) {
Expand Down Expand Up @@ -2330,7 +2335,8 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
heartbeatTime_ = time::WallClock::fastNowInMilliSec();
metadLastUpdateTime_ = resp.get_last_update_time_in_ms();
VLOG(1) << "Metad last update time: " << metadLastUpdateTime_;
return true; // resp.code == nebula::cpp2::ErrorCode::SUCCEEDED
metaServerVersion_ = resp.get_meta_version();
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
Expand Down
5 changes: 4 additions & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "interface/gen-cpp2/meta_types.h"

DECLARE_int32(meta_client_retry_times);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -184,7 +185,7 @@ class MetaClient {

bool isMetadReady();

bool waitForMetadReady(int count = -1, int retryIntervalSecs = 2);
bool waitForMetadReady(int count = -1, int retryIntervalSecs = FLAGS_heartbeat_interval_secs);

void stop();

Expand Down Expand Up @@ -694,6 +695,8 @@ class MetaClient {
folly::RWSpinLock leaderIdsLock_;
int64_t localLastUpdateTime_{0};
int64_t metadLastUpdateTime_{0};
int64_t metaServerVersion_{-1};
static constexpr int64_t EXPECT_META_VERSION = 2;

// leadersLock_ is used to protect leadersInfo
folly::RWSpinLock leadersLock_;
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ struct HBResp {
2: common.HostAddr leader,
3: ClusterID cluster_id,
4: i64 last_update_time_in_ms,
5: i32 meta_version,
}

enum HostRole {
Expand Down
8 changes: 8 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ nebula_add_library(
processors/session/SessionManagerProcessor.cpp
)

add_dependencies(
meta_service_handler
meta_version_man_obj
)

nebula_add_library(
meta_version_man_obj OBJECT
MetaVersionMan.cpp
Expand All @@ -109,6 +114,9 @@ add_dependencies(
set(meta_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:general_storage_service_handler>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
Expand Down
25 changes: 20 additions & 5 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"groups", {"__groups__", true}},
{"zones", {"__zones__", true}},
{"ft_service", {"__ft_service__", false}},
{"sessions", {"__sessions__", true}},
{"id", {"__id__", true}}};
{"sessions", {"__sessions__", true}}};

// SystemInfo will always be backuped
static const std::unordered_map<std::string, std::pair<std::string, bool>> systemInfoMaps{
{"autoIncrementId", {"__id__", true}}, {"lastUpdateTime", {"__last_update_time__", true}}};

// name => {prefix, parseSpaceid}, nullptr means that the backup should be skipped.
static const std::unordered_map<
Expand All @@ -45,7 +48,6 @@ static const std::unordered_map<
{"index", {"__index__", nullptr}},
{"index_status", {"__index_status__", MetaServiceUtils::parseIndexStatusKeySpaceID}},
{"roles", {"__roles__", MetaServiceUtils::parseRoleSpace}},
{"last_update_time", {"__last_update_time__", nullptr}},
{"leaders", {"__leaders__", nullptr}},
{"leader_terms", {"__leader_terms__", nullptr}},
{"listener", {"__listener__", nullptr}},
Expand All @@ -68,7 +70,6 @@ static const std::string kUsersTable = systemTableMaps.at("users").firs
static const std::string kRolesTable = tableMaps.at("roles").first; // NOLINT
static const std::string kConfigsTable = systemTableMaps.at("configs").first; // NOLINT
static const std::string kSnapshotsTable = systemTableMaps.at("snapshots").first; // NOLINT
static const std::string kLastUpdateTimeTable = tableMaps.at("last_update_time").first; // NOLINT
static const std::string kLeadersTable = tableMaps.at("leaders").first; // NOLINT
static const std::string kLeaderTermsTable = tableMaps.at("leader_terms").first; // NOLINT
static const std::string kGroupsTable = systemTableMaps.at("groups").first; // NOLINT
Expand All @@ -87,7 +88,9 @@ const std::string kFTIndexTable = tableMaps.at("ft_index").first;
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
const std::string kSessionsTable = systemTableMaps.at("sessions").first; // NOLINT

const std::string kIdKey = systemTableMaps.at("id").first; // NOLINT
const std::string kIdKey = systemInfoMaps.at("autoIncrementId").first; // NOLINT
const std::string kLastUpdateTimeTable = systemInfoMaps.at("lastUpdateTime").first; // NOLINT

// clang-format on

const int kMaxIpAddrLen = 15; // '255.255.255.255'
Expand Down Expand Up @@ -1095,6 +1098,18 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::string>> MetaServiceUtils::bac
}
}

for (const auto& table : systemInfoMaps) {
if (!table.second.second) {
LOG(INFO) << table.first << " table skipped";
continue;
}
auto result = backupTable(kvstore, backupName, table.second.first, files, nullptr);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
return result;
}
LOG(INFO) << table.first << " table backup successed";
}

// The mapping of space name and space id needs to be handled separately.
auto ret = backupIndex(kvstore, spaces, backupName, spaceNames);
if (!ok(ret)) {
Expand Down
25 changes: 11 additions & 14 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
#include "common/time/WallClock.h"
#include "meta/ActiveHostsMan.h"
#include "meta/KVBasedClusterIdMan.h"

DEFINE_bool(hosts_whitelist_enabled, false, "Check host whether in whitelist when received hb");
#include "meta/MetaVersionMan.h"

namespace nebula {
namespace meta {

HBCounters kHBCounters;

std::atomic<int64_t> HBProcessor::metaVersion_ = -1;

void HBProcessor::onFinished() {
if (counters_) {
stats::StatsManager::addValue(counters_->numCalls_);
Expand All @@ -29,18 +30,6 @@ void HBProcessor::onFinished() {
void HBProcessor::process(const cpp2::HBReq& req) {
HostAddr host((*req.host_ref()).host, (*req.host_ref()).port);
nebula::cpp2::ErrorCode ret;
if (FLAGS_hosts_whitelist_enabled) {
ret = hostExist(MetaServiceUtils::hostKey(host.host, host.port));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Reject unregistered host " << host << "!";
if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
ret = nebula::cpp2::ErrorCode::E_INVALID_HOST;
}
handleErrorCode(ret);
onFinished();
return;
}
}

LOG(INFO) << "Receive heartbeat from " << host
<< ", role = " << apache::thrift::util::enumNameSafe(req.get_role());
Expand Down Expand Up @@ -79,6 +68,14 @@ void HBProcessor::process(const cpp2::HBReq& req) {
} else if (nebula::error(lastUpdateTimeRet) == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
resp_.set_last_update_time_in_ms(0);
}

auto version = metaVersion_.load();
if (version == -1) {
metaVersion_.store(static_cast<int64_t>(MetaVersionMan::getMetaVersionFromKV(kvstore_)));
}

resp_.set_meta_version(metaVersion_.load());

handleErrorCode(ret);
onFinished();
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/admin/HBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class HBProcessor : public BaseProcessor<cpp2::HBResp> {

ClusterID clusterId_{0};
const HBCounters* counters_{nullptr};
static std::atomic<int64_t> metaVersion_;
};

} // namespace meta
Expand Down
2 changes: 0 additions & 2 deletions src/meta/test/HBProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
#include "meta/processors/admin/HBProcessor.h"
#include "meta/test/TestUtils.h"

DECLARE_bool(hosts_whitelist_enabled);

namespace nebula {
namespace meta {

Expand Down
19 changes: 19 additions & 0 deletions src/meta/test/RestoreProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ TEST(RestoreProcessorTest, RestoreTest) {
std::string(reinterpret_cast<const char*>(&zoneId), sizeof(ZoneID)));
data.emplace_back(MetaServiceUtils::zoneKey(zoneName), MetaServiceUtils::zoneVal(hosts));

int32_t autoId = 666;
data.emplace_back(MetaServiceUtils::idKey(),
std::string(reinterpret_cast<const char*>(&autoId), sizeof(autoId)));

auto lastUpdateTime = time::WallClock::fastNowInMilliSec();
data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(),
MetaServiceUtils::lastUpdateTimeVal(lastUpdateTime));

folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) {
ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED);
Expand Down Expand Up @@ -218,6 +226,17 @@ TEST(RestoreProcessorTest, RestoreTest) {
result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result);
ASSERT_FALSE(iter->valid());

std::string key = "__id__";
std::string value;
result = kvRestore->get(kDefaultSpaceId, kDefaultPartId, key, &value);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result);
ASSERT_EQ(autoId, *reinterpret_cast<const int32_t*>(value.data()));

key = "__last_update_time__";
result = kvRestore->get(kDefaultSpaceId, kDefaultPartId, key, &value);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result);
ASSERT_EQ(lastUpdateTime, *reinterpret_cast<const int64_t*>(value.data()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/RebuildEdgeIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac
destination.toString(),
std::move(valuesRet).value());
batchSize += indexKey.size() + indexVal.size();
data.emplace_back(std::move(indexKey), std::move(indexVal));
data.emplace_back(std::move(indexKey), indexVal);
}
}
iter->next();
Expand Down
2 changes: 1 addition & 1 deletion src/storage/admin/RebuildTagIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space
auto indexKey = IndexKeyUtils::vertexIndexKey(
vidSize, part, item->get_index_id(), vertex.toString(), std::move(valuesRet).value());
batchSize += indexKey.size() + indexVal.size();
data.emplace_back(std::move(indexKey), std::move(indexVal));
data.emplace_back(std::move(indexKey), indexVal);
}
}
iter->next();
Expand Down
27 changes: 3 additions & 24 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
set(storage_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
Expand Down Expand Up @@ -575,13 +578,6 @@ nebula_add_test(
SOURCES
KVTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:http_client_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
Expand All @@ -597,15 +593,7 @@ nebula_add_test(
SOURCES
KVClientTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:general_storage_client_obj>
$<TARGET_OBJECTS:storage_client_base_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:http_client_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
Expand Down Expand Up @@ -636,15 +624,6 @@ nebula_add_executable(
SOURCES
ElasticSearchBulkInsertTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:general_storage_client_obj>
$<TARGET_OBJECTS:storage_client_base_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:http_client_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
Expand Down
3 changes: 3 additions & 0 deletions src/tools/db-dump/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
set(tools_test_deps
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
Expand Down
3 changes: 3 additions & 0 deletions src/tools/db-upgrade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ nebula_add_executable(
DbUpgrader.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
Expand Down
3 changes: 3 additions & 0 deletions src/tools/meta-dump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ nebula_add_executable(
MetaDumpTool.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
Expand Down
3 changes: 3 additions & 0 deletions src/tools/simple-kv-verify/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ nebula_add_executable(
SimpleKVVerifyTool.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
Expand Down
3 changes: 3 additions & 0 deletions src/tools/storage-perf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
set(perf_test_deps
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
Expand Down

0 comments on commit 4db27c1

Please sign in to comment.