Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor LOG and add comment for meta, meta/processors/job #3686

Merged
merged 3 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
if (statusVec[i].ok()) {
std::tie(std::ignore, term, code) = MetaKeyUtils::parseLeaderValV3(vals[i]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << apache::thrift::util::enumNameSafe(code);
LOG(INFO) << apache::thrift::util::enumNameSafe(code);
continue;
}
if (terms[i] <= term) {
Expand Down Expand Up @@ -90,13 +90,13 @@ bool ActiveHostsMan::machineRegisted(kvstore::KVStore* kv, const HostAddr& hostA
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::pair<HostAddr, cpp2::HostRole>>>
ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, std::string hostname) {
ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, const std::string& hostname) {
const auto& prefix = MetaKeyUtils::hostPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to get services in the host: " << hostname << ", error "
<< apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Failed to get services in the host: " << hostname << ", error "
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand All @@ -120,7 +120,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
std::unique_ptr<kvstore::KVIterator> machineIter;
auto retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, machinePrefix, &machineIter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Failed to get machines, error " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand All @@ -135,8 +135,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
std::unique_ptr<kvstore::KVIterator> iter;
retCode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Failed to get active hosts, error "
<< apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Failed to get active hosts, error "
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand All @@ -155,7 +155,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
if (info.role_ == cpp2::HostRole::STORAGE &&
std::find(machines.begin(), machines.end(), host) == machines.end()) {
retCode = nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND;
LOG(ERROR) << "Machine not found " << host;
LOG(INFO) << "Machine not found " << host;
break;
}

Expand All @@ -177,8 +177,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
auto zoneKey = MetaKeyUtils::zoneKey(zoneName);
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get zone " << zoneName
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get zone " << zoneName
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand Down Expand Up @@ -208,7 +208,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, spaceKey, &spaceValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get space failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get space failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

Expand Down Expand Up @@ -243,17 +243,17 @@ ErrorOr<nebula::cpp2::ErrorCode, HostInfo> ActiveHostsMan::getHostInfo(kvstore::
std::string machineValue;
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, machineKey, &machineValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get machine info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get machine info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

auto hostKey = MetaKeyUtils::hostKey(host.host, host.port);
std::string hostValue;
retCode = kv->get(kDefaultSpaceId, kDefaultPartId, hostKey, &hostValue);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get host info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get host info " << host
<< " failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
return HostInfo::decode(hostValue);
Expand Down Expand Up @@ -284,8 +284,8 @@ ErrorOr<nebula::cpp2::ErrorCode, int64_t> LastUpdateTimeMan::get(kvstore::KVStor
std::string val;
auto retCode = kv->get(kDefaultSpaceId, kDefaultPartId, key, &val);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Get last update time failed, error: "
<< apache::thrift::util::enumNameSafe(retCode);
LOG(INFO) << "Get last update time failed, error: "
<< apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
return *reinterpret_cast<const int64_t*>(val.data());
Expand Down
83 changes: 79 additions & 4 deletions src/meta/ActiveHostsMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ struct HostInfo {
return info;
}

/*
/**
* @brief
* int8_t dataVer
* int64_t timestamp
* sizeof(HostRole) hostRole
* size_t lenth of gitInfoSha
* string gitInfoSha
* */
*
* @param info
* @return
*/
static std::string encodeV2(const HostInfo& info) {
std::string encode;
int8_t dataVer = 2;
Expand All @@ -74,6 +78,12 @@ struct HostInfo {
return encode;
}

/**
* @brief Parse a serialized value to HostInfo
*
* @param data
* @return
*/
static HostInfo decodeV2(const folly::StringPiece& data) {
HostInfo info;
size_t offset = sizeof(int8_t);
Expand Down Expand Up @@ -107,27 +117,92 @@ class ActiveHostsMan final {
~ActiveHostsMan() = default;

using AllLeaders = std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>;

/**
* @brief Save host info and leader info into kvStore
* If partition leader info was updated, it will update LastUpdateTime, indicating the MetaClient
* update local cache.
*
* @param kv Where to save
* @param hostAddr Which host to save
* @param info Information of the host
* @param leaderParts Leader info of all parts, null means don't need to update leader info
* @return
*/
static nebula::cpp2::ErrorCode updateHostInfo(kvstore::KVStore* kv,
const HostAddr& hostAddr,
const HostInfo& info,
const AllLeaders* leaderParts = nullptr);

/**
* @brief Check if the host is registered
*
* @param kv From where to get
* @param hostAddr Which host to register
* @return
*/
static bool machineRegisted(kvstore::KVStore* kv, const HostAddr& hostAddr);

/**
* @brief Get all registered host
*
* @param kv From where to get
* @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL
* @param role Which role of the host to find. maybe GRAPH META STORAGE LISTENER AGENT
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> getActiveHosts(
kvstore::KVStore* kv, int32_t expiredTTL = 0, cpp2::HostRole role = cpp2::HostRole::STORAGE);

/**
* @brief Get services in the agent host
*
* @param kv From where to get
* @param hostname Hostname or ip
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::pair<HostAddr, cpp2::HostRole>>>
getServicesInHost(kvstore::KVStore* kv, std::string hostname);

getServicesInHost(kvstore::KVStore* kv, const std::string& hostname);

/**
* @brief Get hosts in the zone
*
* @param kv From where to get
* @param zoneName Name of the zone
* @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> getActiveHostsInZone(
kvstore::KVStore* kv, const std::string& zoneName, int32_t expiredTTL = 0);

/**
* @brief Get hosts in the space
*
* @param kv From where to get
* @param spaceId Id of the space
* @param expiredTTL Ignore hosts who do not send heartbeat within longer than expiredTTL
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> getActiveHostsWithZones(
kvstore::KVStore* kv, GraphSpaceID spaceId, int32_t expiredTTL = 0);

/**
* @brief Check if a host is alived, by checking if the host send heartbeat within the default
* time
*
* @param kv From where to get the host's information
* @param host Which host to check
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, bool> isLived(kvstore::KVStore* kv, const HostAddr& host);

/**
* @brief Get hostInfo for a host
*
* @param kv From where to get
* @param host
* @return
*/
static ErrorOr<nebula::cpp2::ErrorCode, HostInfo> getHostInfo(kvstore::KVStore* kv,
const HostAddr& host);

Expand Down
50 changes: 42 additions & 8 deletions src/meta/KVBasedClusterIdMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ namespace meta {
* */
class ClusterIdMan {
public:
/**
* @brief Create a ClusterID by hash function
*
* @param metaAddrs
* @return
*/
static ClusterID create(const std::string& metaAddrs) {
std::hash<std::string> hash_fn;
auto clusterId = hash_fn(metaAddrs);
Expand All @@ -31,23 +37,30 @@ class ClusterIdMan {
return clusterId;
}

/**
* @brief Write the cluster id in a file
*
* @param clusterId
* @param filename
* @return
*/
static bool persistInFile(ClusterID clusterId, const std::string& filename) {
auto dirname = fs::FileUtils::dirname(filename.c_str());
if (!fs::FileUtils::makeDir(dirname)) {
LOG(ERROR) << "Failed mkdir " << dirname;
LOG(INFO) << "Failed mkdir " << dirname;
return false;
}
if (fs::FileUtils::remove(filename.c_str())) {
LOG(INFO) << "Remove the existed file " << filename;
}
int fd = ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) {
LOG(ERROR) << "Open file error, file " << filename << ", error " << strerror(errno);
LOG(INFO) << "Open file error, file " << filename << ", error " << strerror(errno);
return false;
}
int bytes = ::write(fd, reinterpret_cast<const char*>(&clusterId), sizeof(ClusterID));
if (bytes != sizeof(clusterId)) {
LOG(ERROR) << "Write clusterId failed!";
LOG(INFO) << "Write clusterId failed!";
::close(fd);
return false;
}
Expand All @@ -56,17 +69,23 @@ class ClusterIdMan {
return true;
}

/**
* @brief Get cluster id from a file
*
* @param filename
* @return
*/
static ClusterID getClusterIdFromFile(const std::string& filename) {
LOG(INFO) << "Try to open " << filename;
int fd = ::open(filename.c_str(), O_RDONLY);
if (fd < 0) {
LOG(WARNING) << "Open file failed, error " << strerror(errno);
LOG(INFO) << "Open file failed, error " << strerror(errno);
return 0;
}
ClusterID clusterId = 0;
int len = ::read(fd, reinterpret_cast<char*>(&clusterId), sizeof(ClusterID));
if (len != sizeof(ClusterID)) {
LOG(ERROR) << "Get clusterId failed!";
LOG(INFO) << "Get clusterId failed!";
::close(fd);
return 0;
}
Expand All @@ -75,6 +94,13 @@ class ClusterIdMan {
return clusterId;
}

/**
* @brief Get cluster id from kvStore
*
* @param kv
* @param key
* @return
*/
static ClusterID getClusterIdFromKV(kvstore::KVStore* kv, const std::string& key) {
CHECK_NOTNULL(kv);
std::string value;
Expand All @@ -84,16 +110,24 @@ class ClusterIdMan {
return 0;
} else if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (value.size() != sizeof(ClusterID)) {
LOG(ERROR) << "Bad clusterId " << value;
LOG(INFO) << "Bad clusterId " << value;
return 0;
}
return *reinterpret_cast<const ClusterID*>(value.data());
} else {
LOG(ERROR) << "Error in kvstore, err " << static_cast<int32_t>(code);
LOG(INFO) << "Error in kvstore, err " << static_cast<int32_t>(code);
return 0;
}
}

/**
* @brief Save cluster id into kvStore
*
* @param kv
* @param key
* @param clusterId
* @return
*/
static bool persistInKV(kvstore::KVStore* kv, const std::string& key, ClusterID clusterId) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
Expand All @@ -102,7 +136,7 @@ class ClusterIdMan {
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(data), [&](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Put failed, error " << static_cast<int32_t>(code);
LOG(INFO) << "Put failed, error " << static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Put key " << key << ", val " << clusterId;
Expand Down
Loading