Skip to content

Commit

Permalink
fix create space
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Dec 31, 2021
1 parent 8307d7a commit 7b4b693
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) {
for (const auto& [path, partList] : partDiskMap) {
auto partListVal = MetaKeyUtils::diskPartsVal(partList);
std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
auto key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
std::vector<kvstore::KV> data;
data.emplace_back(key, partListVal);
// doPut() not work, will trigger the asan: use heap memory which is free
Expand Down
57 changes: 16 additions & 41 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

DEFINE_int32(default_parts_num, 100, "The default number of parts when a space is created");
DEFINE_int32(default_replica_factor, 1, "The default replica factor when a space is created");
DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -174,19 +175,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}

auto hostLoadingRet = getHostLoading();
if (!nebula::ok(hostLoadingRet)) {
LOG(ERROR) << "Get host loading failed.";
auto retCode = nebula::error(hostLoadingRet);
if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
}
handleErrorCode(retCode);
onFinished();
return;
}

hostLoading_ = std::move(nebula::value(hostLoadingRet));
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
Expand All @@ -200,14 +188,23 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
break;
}

auto now = time::WallClock::fastNowInMilliSec();
auto hosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet)));
for (auto& host : hosts) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
auto key = MetaKeyUtils::hostKey(host.host, host.port);
auto ret = doGet(key);
HostInfo info = HostInfo::decode(nebula::value(ret));
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
} else {
zoneLoading_[zone] += hostIter->second;
}
} else {
zoneLoading_[zone] += hostIter->second;
LOG(WARNING) << "Host " << host << " expired";
}
}
zoneHosts[zone] = std::move(hosts);
Expand Down Expand Up @@ -248,7 +245,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
ss << host << ", ";
}

VLOG(3) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
LOG(INFO) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts));
}

Expand All @@ -264,28 +261,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
LOG(INFO) << "Create space " << spaceName;
}

ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>>
CreateSpaceProcessor::getHostLoading() {
const auto& prefix = MetaKeyUtils::partPrefix();
auto iterRet = doPrefix(prefix);

if (!nebula::ok(iterRet)) {
LOG(ERROR) << "Prefix Parts Failed";
return nebula::error(iterRet);
}

std::unordered_map<HostAddr, int32_t> result;
auto iter = nebula::value(iterRet).get();
while (iter->valid()) {
auto hosts = MetaKeyUtils::parsePartVal(iter->val());
for (auto& host : hosts) {
result[host]++;
}
iter->next();
}
return result;
}

StatusOr<Hosts> CreateSpaceProcessor::pickHostsWithZone(
const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts) {
Expand Down
3 changes: 0 additions & 3 deletions src/meta/processors/parts/CreateSpaceProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class CreateSpaceProcessor : public BaseProcessor<cpp2::ExecResp> {
StatusOr<Hosts> pickHostsWithZone(const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts);

// Get all host's part loading
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>> getHostLoading();

// Get the zones with the least load
StatusOr<std::vector<std::string>> pickLightLoadZones(int32_t replicaFactor);

Expand Down

0 comments on commit 7b4b693

Please sign in to comment.