Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload diskPaths for meta #3369

Merged
merged 3 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2414,6 +2414,21 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
} else {
req.set_leader_partIds(std::move(leaderIds));
}

kvstore::SpaceDiskPartsMap diskParts;
if (listener_ != nullptr) {
listener_->fetchDiskParts(diskParts);
if (diskParts_ != diskParts) {
{
folly::RWSpinLock::WriteHolder holder(&diskPartsLock_);
Nivras marked this conversation as resolved.
Show resolved Hide resolved
diskParts_.clear();
diskParts_ = diskParts;
}
req.set_disk_parts(diskParts);
}
} else {
req.set_disk_parts(diskParts);
}
}

folly::Promise<StatusOr<bool>> promise;
Expand Down
7 changes: 7 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "interface/gen-cpp2/MetaServiceAsyncClient.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"
#include "kvstore/DiskManager.h"

DECLARE_int32(meta_client_retry_times);
DECLARE_int32(heartbeat_interval_secs);
Expand Down Expand Up @@ -162,6 +163,7 @@ class MetaChangedListener {
virtual void onPartUpdated(const PartHosts& partHosts) = 0;
virtual void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>& leaders) = 0;
virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0;
virtual void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const ListenerHosts& listenerHosts) = 0;
Expand Down Expand Up @@ -718,9 +720,14 @@ class MetaClient {
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>> clientsMan_;

// heartbeat is a single thread, maybe leaderIdsLock_ and diskPartsLock_ is useless?
// leaderIdsLock_ is used to protect leaderIds_
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>> leaderIds_;
folly::RWSpinLock leaderIdsLock_;
// diskPartsLock_ is used to protect diskParts_;
kvstore::SpaceDiskPartsMap diskParts_;
folly::RWSpinLock diskPartsLock_;

std::atomic<int64_t> localDataLastUpdateTime_{-1};
std::atomic<int64_t> localCfgLastUpdateTime_{-1};
std::atomic<int64_t> metadLastUpdateTime_{0};
Expand Down
48 changes: 47 additions & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ static const std::unordered_map<
{"balance_task", {"__balance_task__", nullptr}},
{"balance_plan", {"__balance_plan__", nullptr}},
{"ft_index", {"__ft_index__", nullptr}},
{"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}}};
{"local_id", {"__local_id__", MetaKeyUtils::parseLocalIdSpace}},
{"disk_parts", {"__disk_parts__", MetaKeyUtils::parseDiskPartsSpace}}};

// clang-format off
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
Expand All @@ -71,6 +72,7 @@ static const std::string kLeaderTermsTable = tableMaps.at("leader_terms").fir
static const std::string kGroupsTable = systemTableMaps.at("groups").first; // NOLINT
static const std::string kZonesTable = systemTableMaps.at("zones").first; // NOLINT
static const std::string kListenerTable = tableMaps.at("listener").first; // NOLINT
static const std::string kDiskPartsTable = tableMaps.at("disk_parts").first; // NOLINT

// Used to record the number of vertices and edges in the space
// The number of vertices of each tag in the space
Expand Down Expand Up @@ -1150,4 +1152,48 @@ GraphSpaceID MetaKeyUtils::parseLocalIdSpace(folly::StringPiece rawData) {
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

GraphSpaceID MetaKeyUtils::parseDiskPartsSpace(folly::StringPiece rawData) {
auto offset = kDiskPartsTable.size();
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

std::string MetaKeyUtils::diskPartsPrefix() { return kDiskPartsTable; }

std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr) {
std::string key;
std::string hostStr = serializeHostAddr(addr);
key.reserve(kDiskPartsTable.size() + hostStr.size());
key.append(kDiskPartsTable.data(), kDiskPartsTable.size()).append(hostStr.data(), hostStr.size());
return key;
}

std::string MetaKeyUtils::diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId) {
std::string key;
std::string prefix = diskPartsPrefix(addr);
key.reserve(prefix.size() + sizeof(GraphSpaceID));
key.append(prefix.data(), prefix.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

std::string MetaKeyUtils::diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path) {
std::string key;
std::string prefix = diskPartsPrefix(addr, spaceId);
key.reserve(prefix.size() + path.size());
key.append(prefix.data(), prefix.size()).append(path.data(), path.size());
return key;
}

std::string MetaKeyUtils::diskPartsVal(const meta::cpp2::PartitionList& partList) {
std::string val;
apache::thrift::CompactSerializer::serialize(partList, &val);
return val;
}

meta::cpp2::PartitionList MetaKeyUtils::parseDiskPartsVal(const folly::StringPiece& rawData) {
meta::cpp2::PartitionList partList;
apache::thrift::CompactSerializer::deserialize(rawData, partList);
return partList;
}

} // namespace nebula
14 changes: 14 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ class MetaKeyUtils final {
static std::unordered_map<std::string, std::pair<std::string, bool>> getSystemInfoMaps();

static std::unordered_map<std::string, std::pair<std::string, bool>> getSystemTableMaps();

static GraphSpaceID parseDiskPartsSpace(folly::StringPiece rawData);

static std::string diskPartsPrefix();

static std::string diskPartsPrefix(HostAddr addr);

static std::string diskPartsPrefix(HostAddr addr, GraphSpaceID spaceId);

static std::string diskPartsKey(HostAddr addr, GraphSpaceID spaceId, std::string path);

static std::string diskPartsVal(const meta::cpp2::PartitionList& partList);

static meta::cpp2::PartitionList parseDiskPartsVal(const folly::StringPiece& rawData);
};

} // namespace nebula
Expand Down
7 changes: 7 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ struct LeaderInfo {
2: i64 term
}

struct PartitionList {
1: list<common.PartitionID> part_list;
}

struct HBReq {
1: HostRole role,
2: common.HostAddr host,
Expand All @@ -563,6 +567,9 @@ struct HBReq {
5: binary git_info_sha,
// version of binary
6: optional binary version,
7: optional map<common.GraphSpaceID, map<binary, PartitionList>
Nivras marked this conversation as resolved.
Show resolved Hide resolved
(cpp.template = "std::unordered_map")>
(cpp.template = "std::unordered_map") disk_parts;
}

struct IndexFieldDef {
Expand Down
19 changes: 14 additions & 5 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,21 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId,
}
}

StatusOr<PartDiskMap> DiskManager::partDist(GraphSpaceID spaceId) {
auto spaceIt = partPath_.find(spaceId);
if (spaceIt == partPath_.end()) {
return Status::Error("Space not found");
void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) {
std::lock_guard<std::mutex> lg(lock_);
Nivras marked this conversation as resolved.
Show resolved Hide resolved
for (const auto& [space, partDiskMap] : partPath_) {
std::unordered_map<std::string, meta::cpp2::PartitionList> tmpPartPaths;
for (const auto& [path, partitions] : partDiskMap) {
std::vector<PartitionID> tmpPartitions;
Nivras marked this conversation as resolved.
Show resolved Hide resolved
for (const auto& partition : partitions) {
tmpPartitions.emplace_back(partition);
}
meta::cpp2::PartitionList ps;
ps.set_part_list(tmpPartitions);
tmpPartPaths[path] = ps;
}
diskParts.emplace(space, std::move(tmpPartPaths));
}
return spaceIt->second;
}

bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
#include "common/base/StatusOr.h"
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftTypes.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
namespace kvstore {

using PartDiskMap = std::unordered_map<std::string, std::set<PartitionID>>;
using SpaceDiskPartsMap =
std::unordered_map<GraphSpaceID, std::unordered_map<std::string, meta::cpp2::PartitionList>>;

class DiskManager {
FRIEND_TEST(DiskManagerTest, AvailableTest);
Expand Down Expand Up @@ -52,6 +55,9 @@ class DiskManager {
// Given a space, return data path and all partition in the path
StatusOr<PartDiskMap> partDist(GraphSpaceID spaceId);

// Get all space data path and all partition in the path
void getDiskParts(SpaceDiskPartsMap& diskParts);

private:
// refresh free bytes of data path periodically
void refresh();
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ void NebulaStore::checkRemoteListeners(GraphSpaceID spaceId,
}
}

void NebulaStore::fetchDiskParts(SpaceDiskPartsMap& diskParts) {
diskMan_->getDiskParts(diskParts);
}

void NebulaStore::updateSpaceOption(GraphSpaceID spaceId,
const std::unordered_map<std::string, std::string>& options,
bool isDbOption) {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) override;

void fetchDiskParts(SpaceDiskPartsMap& diskParts) override;

nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) override;

Expand Down
8 changes: 8 additions & 0 deletions src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ void MetaServerBasedPartManager::fetchLeaderInfo(
}
}

void MetaServerBasedPartManager::fetchDiskParts(SpaceDiskPartsMap& diskParts) {
if (handler_ != nullptr) {
handler_->fetchDiskParts(diskParts);
} else {
VLOG(1) << "handler_ is nullptr!";
}
}

meta::ListenersMap MetaServerBasedPartManager::listeners(const HostAddr& host) {
auto ret = client_->getListenersByHostFromCache(host);
if (ret.ok()) {
Expand Down
12 changes: 9 additions & 3 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "clients/meta/MetaClient.h"
#include "common/base/Base.h"
#include "common/meta/Common.h"
#include "kvstore/DiskManager.h"

namespace nebula {
namespace kvstore {
Expand All @@ -34,9 +35,6 @@ class Handler {

virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0;

virtual int32_t allLeader(
std::unordered_map<GraphSpaceID, std::vector<meta::cpp2::LeaderInfo>>& leaderIds) = 0;

virtual void addListener(GraphSpaceID spaceId,
PartitionID partId,
meta::cpp2::ListenerType type,
Expand All @@ -49,6 +47,12 @@ class Handler {
virtual void checkRemoteListeners(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) = 0;

// get infos from handler(nebula store) to listener(meta_client -> meta)
virtual int32_t allLeader(
std::unordered_map<GraphSpaceID, std::vector<meta::cpp2::LeaderInfo>>& leaderIds) = 0;

virtual void fetchDiskParts(SpaceDiskPartsMap& diskParts) = 0;
};

/**
Expand Down Expand Up @@ -210,6 +214,8 @@ class MetaServerBasedPartManager : public PartManager, public meta::MetaChangedL
void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<meta::cpp2::LeaderInfo>>& leaderParts) override;

void fetchDiskParts(SpaceDiskPartsMap& diskParts) override;

void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const meta::ListenerHosts& listenerHosts) override;
Expand Down
21 changes: 21 additions & 0 deletions src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@ void HBProcessor::process(const cpp2::HBReq& req) {
onFinished();
return;
}

if (req.disk_parts_ref().has_value()) {
for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) {
for (const auto& [path, partList] : partDiskMap) {
auto partListVal = MetaKeyUtils::diskPartsVal(partList);
std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
std::vector<kvstore::KV> data;
data.emplace_back(key, partListVal);
// doPut() not work, will trigger the asan: use heap memory which is free
folly::Baton<true, std::atomic> baton;
kvstore_->asyncMultiPut(kDefaultSpaceId,
Nivras marked this conversation as resolved.
Show resolved Hide resolved
kDefaultPartId,
std::move(data),
[this, &baton](nebula::cpp2::ErrorCode code) {
this->handleErrorCode(code);
baton.post();
});
baton.wait();
}
}
}
}

HostInfo info(time::WallClock::fastNowInMilliSec(), req.get_role(), req.get_git_info_sha());
Expand Down
5 changes: 5 additions & 0 deletions src/meta/test/MetaClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,11 @@ class TestListener : public MetaChangedListener {
UNUSED(remoteListeners);
}

void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) override {
UNUSED(diskParts);
LOG(INFO) << "Fetch Disk Paths";
}

int32_t spaceNum = 0;
int32_t partNum = 0;
int32_t partChanged = 0;
Expand Down