Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files Browse the repository at this point in the history
critical27 committed Aug 26, 2021
1 parent 8fdff96 commit 27e9f0d
Showing 12 changed files with 77 additions and 45 deletions.
8 changes: 8 additions & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -96,6 +96,11 @@ nebula_add_library(
processors/session/SessionManagerProcessor.cpp
)

add_dependencies(
meta_service_handler
meta_version_man_obj
)

nebula_add_library(
meta_version_man_obj OBJECT
MetaVersionMan.cpp
@@ -109,6 +114,9 @@ add_dependencies(
set(meta_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:general_storage_service_handler>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
25 changes: 20 additions & 5 deletions src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
@@ -28,8 +28,11 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"groups", {"__groups__", true}},
{"zones", {"__zones__", true}},
{"ft_service", {"__ft_service__", false}},
{"sessions", {"__sessions__", true}},
{"id", {"__id__", true}}};
{"sessions", {"__sessions__", true}}};

// SystemInfo will always be backuped
static const std::unordered_map<std::string, std::pair<std::string, bool>> systemInfoMaps{
{"autoIncrementId", {"__id__", true}}, {"lastUpdateTime", {"__last_update_time__", true}}};

// name => {prefix, parseSpaceid}, nullptr means that the backup should be skipped.
static const std::unordered_map<
@@ -45,7 +48,6 @@ static const std::unordered_map<
{"index", {"__index__", nullptr}},
{"index_status", {"__index_status__", MetaServiceUtils::parseIndexStatusKeySpaceID}},
{"roles", {"__roles__", MetaServiceUtils::parseRoleSpace}},
{"last_update_time", {"__last_update_time__", nullptr}},
{"leaders", {"__leaders__", nullptr}},
{"leader_terms", {"__leader_terms__", nullptr}},
{"listener", {"__listener__", nullptr}},
@@ -68,7 +70,6 @@ static const std::string kUsersTable = systemTableMaps.at("users").firs
static const std::string kRolesTable = tableMaps.at("roles").first; // NOLINT
static const std::string kConfigsTable = systemTableMaps.at("configs").first; // NOLINT
static const std::string kSnapshotsTable = systemTableMaps.at("snapshots").first; // NOLINT
static const std::string kLastUpdateTimeTable = tableMaps.at("last_update_time").first; // NOLINT
static const std::string kLeadersTable = tableMaps.at("leaders").first; // NOLINT
static const std::string kLeaderTermsTable = tableMaps.at("leader_terms").first; // NOLINT
static const std::string kGroupsTable = systemTableMaps.at("groups").first; // NOLINT
@@ -87,7 +88,9 @@ const std::string kFTIndexTable = tableMaps.at("ft_index").first;
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
const std::string kSessionsTable = systemTableMaps.at("sessions").first; // NOLINT

const std::string kIdKey = systemTableMaps.at("id").first; // NOLINT
const std::string kIdKey = systemInfoMaps.at("autoIncrementId").first; // NOLINT
const std::string kLastUpdateTimeTable = systemInfoMaps.at("lastUpdateTime").first; // NOLINT

// clang-format on

const int kMaxIpAddrLen = 15; // '255.255.255.255'
@@ -1095,6 +1098,18 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::string>> MetaServiceUtils::bac
}
}

for (const auto& table : systemInfoMaps) {
if (!table.second.second) {
LOG(INFO) << table.first << " table skipped";
continue;
}
auto result = backupTable(kvstore, backupName, table.second.first, files, nullptr);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
return result;
}
LOG(INFO) << table.first << " table backup successed";
}

// The mapping of space name and space id needs to be handled separately.
auto ret = backupIndex(kvstore, spaces, backupName, spaceNames);
if (!ok(ret)) {
25 changes: 11 additions & 14 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
@@ -9,14 +9,15 @@
#include "common/time/WallClock.h"
#include "meta/ActiveHostsMan.h"
#include "meta/KVBasedClusterIdMan.h"

DEFINE_bool(hosts_whitelist_enabled, false, "Check host whether in whitelist when received hb");
#include "meta/MetaVersionMan.h"

namespace nebula {
namespace meta {

HBCounters kHBCounters;

std::atomic<int64_t> HBProcessor::metaVersion_ = -1;

void HBProcessor::onFinished() {
if (counters_) {
stats::StatsManager::addValue(counters_->numCalls_);
@@ -29,18 +30,6 @@ void HBProcessor::onFinished() {
void HBProcessor::process(const cpp2::HBReq& req) {
HostAddr host((*req.host_ref()).host, (*req.host_ref()).port);
nebula::cpp2::ErrorCode ret;
if (FLAGS_hosts_whitelist_enabled) {
ret = hostExist(MetaServiceUtils::hostKey(host.host, host.port));
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Reject unregistered host " << host << "!";
if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
ret = nebula::cpp2::ErrorCode::E_INVALID_HOST;
}
handleErrorCode(ret);
onFinished();
return;
}
}

LOG(INFO) << "Receive heartbeat from " << host
<< ", role = " << apache::thrift::util::enumNameSafe(req.get_role());
@@ -79,6 +68,14 @@ void HBProcessor::process(const cpp2::HBReq& req) {
} else if (nebula::error(lastUpdateTimeRet) == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
resp_.set_last_update_time_in_ms(0);
}

auto version = metaVersion_.load();
if (version == -1) {
metaVersion_.store(static_cast<int64_t>(MetaVersionMan::getMetaVersionFromKV(kvstore_)));
}

resp_.set_meta_version(metaVersion_.load());

handleErrorCode(ret);
onFinished();
}
1 change: 1 addition & 0 deletions src/meta/processors/admin/HBProcessor.h
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ class HBProcessor : public BaseProcessor<cpp2::HBResp> {

ClusterID clusterId_{0};
const HBCounters* counters_{nullptr};
static std::atomic<int64_t> metaVersion_;
};

} // namespace meta
2 changes: 0 additions & 2 deletions src/meta/test/HBProcessorTest.cpp
Original file line number Diff line number Diff line change
@@ -11,8 +11,6 @@
#include "meta/processors/admin/HBProcessor.h"
#include "meta/test/TestUtils.h"

DECLARE_bool(hosts_whitelist_enabled);

namespace nebula {
namespace meta {

19 changes: 19 additions & 0 deletions src/meta/test/RestoreProcessorTest.cpp
Original file line number Diff line number Diff line change
@@ -89,6 +89,14 @@ TEST(RestoreProcessorTest, RestoreTest) {
std::string(reinterpret_cast<const char*>(&zoneId), sizeof(ZoneID)));
data.emplace_back(MetaServiceUtils::zoneKey(zoneName), MetaServiceUtils::zoneVal(hosts));

int32_t autoId = 666;
data.emplace_back(MetaServiceUtils::idKey(),
std::string(reinterpret_cast<const char*>(&autoId), sizeof(autoId)));

auto lastUpdateTime = time::WallClock::fastNowInMilliSec();
data.emplace_back(MetaServiceUtils::lastUpdateTimeKey(),
MetaServiceUtils::lastUpdateTimeVal(lastUpdateTime));

folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) {
ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED);
@@ -218,6 +226,17 @@ TEST(RestoreProcessorTest, RestoreTest) {
result = kvRestore->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result);
ASSERT_FALSE(iter->valid());

std::string key = "__id__";
std::string value;
result = kvRestore->get(kDefaultSpaceId, kDefaultPartId, key, &value);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result);
ASSERT_EQ(autoId, *reinterpret_cast<const int32_t*>(value.data()));

key = "__last_update_time__";
result = kvRestore->get(kDefaultSpaceId, kDefaultPartId, key, &value);
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, result);
ASSERT_EQ(lastUpdateTime, *reinterpret_cast<const int64_t*>(value.data()));
}
}

27 changes: 3 additions & 24 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
set(storage_test_deps
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
@@ -575,13 +578,6 @@ nebula_add_test(
SOURCES
KVTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:http_client_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
@@ -597,15 +593,7 @@ nebula_add_test(
SOURCES
KVClientTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:general_storage_client_obj>
$<TARGET_OBJECTS:storage_client_base_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:http_client_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
@@ -636,15 +624,6 @@ nebula_add_executable(
SOURCES
ElasticSearchBulkInsertTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:general_storage_client_obj>
$<TARGET_OBJECTS:storage_client_base_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:http_client_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
3 changes: 3 additions & 0 deletions src/tools/db-dump/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
set(tools_test_deps
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
3 changes: 3 additions & 0 deletions src/tools/db-upgrade/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -8,6 +8,9 @@ nebula_add_executable(
DbUpgrader.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
3 changes: 3 additions & 0 deletions src/tools/meta-dump/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ nebula_add_executable(
MetaDumpTool.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
3 changes: 3 additions & 0 deletions src/tools/simple-kv-verify/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ nebula_add_executable(
SimpleKVVerifyTool.cpp
OBJECTS
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_common_obj>
3 changes: 3 additions & 0 deletions src/tools/storage-perf/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
set(perf_test_deps
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:meta_version_man_obj>
$<TARGET_OBJECTS:meta_v1_thrift_obj>
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>

0 comments on commit 27e9f0d

Please sign in to comment.