Skip to content

Commit

Permalink
Merge branch 'master' into clear_space
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohaifei authored Mar 14, 2022
2 parents 24a0f2f + 25997c4 commit 67e88ea
Show file tree
Hide file tree
Showing 25 changed files with 698 additions and 281 deletions.
22 changes: 22 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,7 @@ Status MetaClient::authCheckFromCache(const std::string& account, const std::str
if (!ready_) {
return Status::Error("Meta Service not ready");
}

folly::rcu_reader guard;
const auto& metadata = *metadata_.load();
auto iter = metadata.userPasswordMap_.find(account);
Expand Down Expand Up @@ -2484,6 +2485,11 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
}
}

// TTL for clientAddrMap
// If multiple connections are created but do not authenticate, the clientAddrMap_ will keep
// growing. This is to clear the clientAddrMap_ regularly.
clearClientAddrMap();

// info used in the agent, only set once
// TOOD(spw): if we could add data path(disk) dynamicly in the future, it should be
// reported every time it changes
Expand Down Expand Up @@ -3635,5 +3641,21 @@ Status MetaClient::verifyVersion() {
}
return Status::OK();
}

void MetaClient::clearClientAddrMap() {
if (clientAddrMap_.size() == 0) {
return;
}

auto curTimestamp = time::WallClock::fastNowInSec();
for (auto it = clientAddrMap_.cbegin(); it != clientAddrMap_.cend();) {
// The clientAddr is expired
if (it->second < curTimestamp) {
it = clientAddrMap_.erase(it);
} else {
++it;
}
}
}
} // namespace meta
} // namespace nebula
21 changes: 21 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ using MetaConfigMap =
using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;

using clientAddrMap = folly::ConcurrentHashMap<HostAddr, int64_t>;
class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;
Expand Down Expand Up @@ -644,6 +646,10 @@ class MetaClient {
return options_.localHost_.toString();
}

clientAddrMap& getClientAddrMap() {
return clientAddrMap_;
}

protected:
// Return true if load succeeded.
bool loadData();
Expand Down Expand Up @@ -730,6 +736,9 @@ class MetaClient {

Status verifyVersion();

// Removes expired keys in the clientAddrMap_
void clearClientAddrMap();

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>> clientsMan_;
Expand Down Expand Up @@ -810,6 +819,18 @@ class MetaClient {
NameIndexMap tagNameIndexMap_;
NameIndexMap edgeNameIndexMap_;

// TODO(Aiee) This is a walkaround to address the problem that using a lower version(< v2.6.0)
// client to connect with higher version(>= v3.0.0) Nebula service will cause a crash.
//
// The key here is the host of the client that sends the request, and the value indicates the
// expiration of the key because we don't want to keep the key forever.
//
// The assumption here is that there is ONLY ONE VERSION of the client in the host.
//
// This map will be updated when verifyVersion() is called. Only the clients since v2.6.0 will
// call verifyVersion(), thus we could determine whether the client version is lower than v2.6.0
clientAddrMap clientAddrMap_;

// Global service client
ServiceClientsList serviceClientList_;
FTIndexMap fulltextIndexMap_;
Expand Down
2 changes: 0 additions & 2 deletions src/common/expression/test/AggregateExpressionBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include <folly/Benchmark.h>

#include <memory>

#include "common/base/ObjectPool.h"
#include "common/expression/AggregateExpression.h"
#include "common/expression/ConstantExpression.h"
Expand Down
4 changes: 2 additions & 2 deletions src/common/fs/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ StatusOr<std::string> FileUtils::readLink(const char* path) {
}

StatusOr<std::string> FileUtils::realPath(const char* path) {
char* buffer = ::realpath(path, NULL);
if (buffer == NULL) {
char* buffer = ::realpath(path, nullptr);
if (buffer == nullptr) {
return Status::Error("realpath %s: %s", path, ::strerror(errno));
}
std::string truePath(buffer);
Expand Down
49 changes: 29 additions & 20 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@

namespace nebula {

#define X(EnumName, EnumNumber) EnumName = EnumNumber,
#define X(EnumName, EnumNumber) EnumName = (EnumNumber),

enum class ErrorCode { ErrorCodeEnums };

Expand Down Expand Up @@ -295,7 +295,9 @@ struct ProfilingStats {
ProfilingStatsObj.insert("rows", rows);
ProfilingStatsObj.insert("execDurationInUs", execDurationInUs);
ProfilingStatsObj.insert("totalDurationInUs", totalDurationInUs);
ProfilingStatsObj.insert("otherStats", folly::toDynamic(*otherStats));
if (otherStats) {
ProfilingStatsObj.insert("otherStats", folly::toDynamic(*otherStats));
}

return ProfilingStatsObj;
}
Expand Down Expand Up @@ -323,7 +325,7 @@ struct PlanNodeBranchInfo {
}

// True if loop body or then branch of select
bool isDoBranch{0};
bool isDoBranch{false};
// select/loop node id
int64_t conditionNodeId{-1};

Expand Down Expand Up @@ -407,22 +409,29 @@ struct PlanNodeDescription {
planNodeDescObj.insert("id", id);
planNodeDescObj.insert("outputVar", outputVar);

auto descriptionObj = folly::dynamic::array();
descriptionObj.resize(description->size());
std::transform(
description->begin(), description->end(), descriptionObj.begin(), [](const auto &ele) {
return ele.toJson();
});
planNodeDescObj.insert("description", descriptionObj);

auto profilesObj = folly::dynamic::array();
profilesObj.resize(profiles->size());
std::transform(profiles->begin(), profiles->end(), profilesObj.begin(), [](const auto &ele) {
return ele.toJson();
});
planNodeDescObj.insert("profiles", profilesObj);
planNodeDescObj.insert("branchInfo", branchInfo->toJson());
planNodeDescObj.insert("dependencies", folly::toDynamic(*dependencies));
if (description) {
auto descriptionObj = folly::dynamic::array();
descriptionObj.resize(description->size());
std::transform(
description->begin(), description->end(), descriptionObj.begin(), [](const auto &ele) {
return ele.toJson();
});
planNodeDescObj.insert("description", descriptionObj);
}
if (profiles) {
auto profilesObj = folly::dynamic::array();
profilesObj.resize(profiles->size());
std::transform(profiles->begin(), profiles->end(), profilesObj.begin(), [](const auto &ele) {
return ele.toJson();
});
planNodeDescObj.insert("profiles", profilesObj);
}
if (branchInfo) {
planNodeDescObj.insert("branchInfo", branchInfo->toJson());
}
if (dependencies) {
planNodeDescObj.insert("dependencies", folly::toDynamic(*dependencies));
}

return planNodeDescObj;
}
Expand Down Expand Up @@ -536,7 +545,7 @@ struct ExecutionResponse {
std::unique_ptr<PlanDescription> planDesc{nullptr};
std::unique_ptr<std::string> comment{nullptr};

// Return the response as a JSON string
// Returns the response as a JSON string
// only errorCode and latencyInUs are required fields, the rest are optional
// if the dataset contains a value of TIME or DATETIME, it will be returned in UTC.
//
Expand Down
20 changes: 20 additions & 0 deletions src/common/graph/tests/ResponseEncodeDecodeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ TEST(ResponseEncodeDecodeTest, Basic) {
}

TEST(ResponseEncodeDecodeTest, ToJson) {
// PlanNodeDescription
{
// Dummy data
PlanNodeDescription pnd;
pnd.name = "name";
pnd.id = 100;
pnd.outputVar = "outputVar";
pnd.description = nullptr;
pnd.profiles = nullptr;
pnd.branchInfo = nullptr;
pnd.dependencies = nullptr;

folly::dynamic jsonObj = pnd.toJson();
folly::dynamic expect = folly::dynamic::object();
expect.insert("name", "name");
expect.insert("id", 100);
expect.insert("outputVar", "outputVar");

ASSERT_EQ(jsonObj, expect);
}
// plan description
{
std::vector<PlanDescription> pds;
Expand Down
7 changes: 2 additions & 5 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

#include "common/memory/MemoryUtils.h"

#include <folly/String.h>
#include <gflags/gflags.h>

#include <algorithm>
#include <cstdio>
#include <fstream>
#include <regex>

#include "common/fs/FileUtils.h"

Expand Down Expand Up @@ -42,7 +39,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
uint64_t cacheSize = 0;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
cacheSize += std::stoul(sm[2].str(), NULL);
cacheSize += std::stoul(sm[2].str(), nullptr);
}

std::string limitPath =
Expand All @@ -64,7 +61,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
std::vector<uint64_t> memorySize;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
memorySize.emplace_back(std::stoul(sm[2].str(), NULL) << 10);
memorySize.emplace_back(std::stoul(sm[2].str(), nullptr) << 10);
}
std::sort(memorySize.begin(), memorySize.end());
if (memorySize.size() >= 2u) {
Expand Down
14 changes: 7 additions & 7 deletions src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,47 +112,47 @@ std::unordered_set<uint16_t> NetworkUtils::getPortsInUse() {
fs::FileUtils::FileLineIterator iter("/proc/net/tcp", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/tcp6", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/udp", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/udp6", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/raw", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/raw6", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
Expand Down Expand Up @@ -209,7 +209,7 @@ StatusOr<std::vector<HostAddr>> NetworkUtils::resolveHost(const std::string& hos
continue;
}

auto address = ((struct sockaddr_in*)rp->ai_addr)->sin_addr.s_addr;
auto address = (reinterpret_cast<struct sockaddr_in*>(rp->ai_addr))->sin_addr.s_addr;
// We need to match the integer byte order generated by ipv4ToInt,
// so we need to convert here.
addrs.emplace_back(intToIPv4(htonl(std::move(address))), port);
Expand Down
4 changes: 2 additions & 2 deletions src/common/time/test/WallClockBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using nebula::time::WallClock;
BENCHMARK(gettimeofday_get_msec, iters) {
for (uint32_t i = 0; i < iters; i++) {
struct timeval tp;
gettimeofday(&tp, NULL);
gettimeofday(&tp, nullptr);
auto ts = tp.tv_sec * 1000 + tp.tv_usec / 1000;
folly::doNotOptimizeAway(ts);
}
Expand Down Expand Up @@ -46,7 +46,7 @@ BENCHMARK_DRAW_LINE();
BENCHMARK(gettimeofday_get_sec, iters) {
for (uint32_t i = 0; i < iters; i++) {
struct timeval tp;
gettimeofday(&tp, NULL);
gettimeofday(&tp, nullptr);
auto ts = tp.tv_sec;
folly::doNotOptimizeAway(ts);
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ DECLARE_uint32(failed_login_attempts);
// The deault value is 0. A value of 0 disables the option.
DECLARE_uint32(password_lock_time_in_secs);

// optimizer
// Optimizer
DECLARE_bool(enable_optimizer);

DECLARE_int64(max_allowed_connections);
Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/GraphServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bool GraphServer::start() {
return false;
}

// Init worker id for snowflake generating unique id
nebula::Snowflake::initWorkerId(interface->metaClient_.get());

graphThread_ = std::make_unique<std::thread>([&] {
Expand Down Expand Up @@ -90,6 +91,7 @@ void GraphServer::notifyStop() {
}
}

// Stop the server.
void GraphServer::stop() {
if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) {
LOG(INFO) << "The graph server has been stopped";
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class GraphServer {

void stop();

// used for signal handler to set an internal stop flag
// Used for signal handler to set an internal stop flag
void notifyStop();

void waitUntilStop();
Expand Down
Loading

0 comments on commit 67e88ea

Please sign in to comment.