Skip to content

Commit

Permalink
Merge branch 'expression_depth_3' of github.com:heroicNeZha/nebula in…
Browse files Browse the repository at this point in the history
…to expression_depth_3
  • Loading branch information
heroicNeZha committed Jan 4, 2022
2 parents 8957cd4 + 9b0b8b0 commit 7aa7944
Show file tree
Hide file tree
Showing 92 changed files with 3,164 additions and 291 deletions.
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60

############## rocksdb Options ##############
--rocksdb_wal_sync=true
1 change: 1 addition & 0 deletions resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
"rocksdb_block_based_table_options"
]
}

144 changes: 138 additions & 6 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#include <thrift/lib/cpp/util/EnumUtils.h>

#include <boost/filesystem.hpp>
#include <unordered_set>

#include "clients/meta/FileBasedClusterIdMan.h"
#include "clients/meta/stats/MetaClientStats.h"
#include "common/base/Base.h"
#include "common/base/MurmurHash2.h"
#include "common/base/Status.h"
#include "common/conf/Configuration.h"
#include "common/http/HttpClient.h"
#include "common/meta/NebulaSchemaProvider.h"
Expand All @@ -37,6 +39,27 @@ DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval bet
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId");
DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1<<n times");
DEFINE_uint32(failed_login_attempts,
0,
"how many consecutive incorrect passwords input to a SINGLE graph service node cause "
"the account to become locked.");
DEFINE_uint32(
password_lock_time_in_secs,
0,
"how long in seconds to lock the account after too many consecutive login attempts provide an "
"incorrect password.");

// Sanity-checking Flag Values
static bool ValidateFailedLoginAttempts(const char* flagname, uint32_t value) {
if (value <= 32767) // value is ok
return true;

FLOG_WARN("Invalid value for --%s: %d, the timeout should be an integer between 0 and 32767\n",
flagname,
(int)value);
return false;
}
DEFINE_validator(failed_login_attempts, &ValidateFailedLoginAttempts);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -152,6 +175,11 @@ bool MetaClient::loadUsersAndRoles() {
}
decltype(userRolesMap_) userRolesMap;
decltype(userPasswordMap_) userPasswordMap;
decltype(userPasswordAttemptsRemain_) userPasswordAttemptsRemain;
decltype(userLoginLockTime_) userLoginLockTime;
// List of username
std::unordered_set<std::string> userNameList;

for (auto& user : userRoleRet.value()) {
auto rolesRet = getUserRoles(user.first).get();
if (!rolesRet.ok()) {
Expand All @@ -160,11 +188,40 @@ bool MetaClient::loadUsersAndRoles() {
}
userRolesMap[user.first] = rolesRet.value();
userPasswordMap[user.first] = user.second;
userPasswordAttemptsRemain[user.first] = FLAGS_failed_login_attempts;
userLoginLockTime[user.first] = 0;
userNameList.emplace(user.first);
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
userRolesMap_ = std::move(userRolesMap);
userPasswordMap_ = std::move(userPasswordMap);

// This method is called periodically by the heartbeat thread, but we don't want to reset the
// failed login attempts every time. Remove expired users from cache
for (auto& ele : userPasswordAttemptsRemain) {
if (userNameList.count(ele.first) == 0) {
userPasswordAttemptsRemain.erase(ele.first);
}
}
for (auto& ele : userLoginLockTime) {
if (userNameList.count(ele.first) == 0) {
userLoginLockTime.erase(ele.first);
}
}

// If the user is not in the map, insert value with the default value
for (const auto& user : userNameList) {
if (userPasswordAttemptsRemain.count(user) == 0) {
userPasswordAttemptsRemain[user] = FLAGS_failed_login_attempts;
}
if (userLoginLockTime.count(user) == 0) {
userLoginLockTime[user] = 0;
}
}

userPasswordAttemptsRemain_ = std::move(userPasswordAttemptsRemain);
userLoginLockTime_ = std::move(userLoginLockTime);
}
return true;
}
Expand Down Expand Up @@ -694,6 +751,7 @@ void MetaClient::getResponse(Request req,
return;
} else {
LOG(ERROR) << "Send request to " << host << ", exceed retry limit";
LOG(ERROR) << "RpcResponse exception: " << t.exception().what().c_str();
pro.setValue(
Status::Error("RPC failure in MetaClient: %s", t.exception().what().c_str()));
}
Expand Down Expand Up @@ -873,8 +931,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("list cluster failure!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_GET_ABS_PATH_FAILURE:
return Status::Error("Failed to get the absolute path!");
case nebula::cpp2::ErrorCode::E_GET_META_DIR_FAILURE:
return Status::Error("Failed to get meta dir!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE:
return Status::Error("There is no agent!");
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
Expand Down Expand Up @@ -1369,6 +1427,8 @@ folly::Future<StatusOr<bool>> MetaClient::multiPut(

cpp2::MultiPutReq req;
std::vector<nebula::KeyValue> data;
data.reserve(pairs.size());

for (auto& element : pairs) {
data.emplace_back(std::move(element));
}
Expand Down Expand Up @@ -2353,16 +2413,73 @@ std::vector<cpp2::RoleItem> MetaClient::getRolesByUserFromCache(const std::strin
return iter->second;
}

bool MetaClient::authCheckFromCache(const std::string& account, const std::string& password) {
Status MetaClient::authCheckFromCache(const std::string& account, const std::string& password) {
// Check meta service status
if (!ready_) {
return false;
return Status::Error("Meta Service not ready");
}

const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo();
// Check user existence
auto iter = threadLocalInfo.userPasswordMap_.find(account);
if (iter == threadLocalInfo.userPasswordMap_.end()) {
return false;
return Status::Error("User not exist");
}

folly::RWSpinLock::WriteHolder holder(localCacheLock_);

auto& lockedSince = userLoginLockTime_[account];
auto& passwordAttemtRemain = userPasswordAttemptsRemain_[account];
LOG(INFO) << "Thread id: " << std::this_thread::get_id()
<< " ,passwordAttemtRemain: " << passwordAttemtRemain;
// lockedSince is non-zero means the account has been locked
if (lockedSince != 0) {
auto remainingLockTime =
(lockedSince + FLAGS_password_lock_time_in_secs) - time::WallClock::fastNowInSec();
// If the account is still locked, there is no need to check the password
if (remainingLockTime > 0) {
return Status::Error(
"%d times consecutive incorrect passwords has been input, user name: %s has been "
"locked, try again in %ld seconds",
FLAGS_failed_login_attempts,
account.c_str(),
remainingLockTime);
}
// Clear lock state and reset attempts
lockedSince = 0;
passwordAttemtRemain = FLAGS_failed_login_attempts;
}
return iter->second == password;

if (iter->second != password) {
// By default there is no limit of login attempts if any of these 2 flags is unset
if (FLAGS_failed_login_attempts == 0 || FLAGS_password_lock_time_in_secs == 0) {
return Status::Error("Invalid password");
}

// If the password is not correct and passwordAttemtRemain > 0,
// Allow another attemp
if (passwordAttemtRemain > 0) {
--passwordAttemtRemain;
if (passwordAttemtRemain == 0) {
// If the remaining attemps is 0, failed to authenticate
// Block user login
lockedSince = time::WallClock::fastNowInSec();
return Status::Error(
"%d times consecutive incorrect passwords has been input, user name: %s has been "
"locked, try again in %d seconds",
FLAGS_failed_login_attempts,
account.c_str(),
FLAGS_password_lock_time_in_secs);
}
LOG(ERROR) << "Invalid password, remaining attempts: " << passwordAttemtRemain;
return Status::Error("Invalid password, remaining attempts: %d", passwordAttemtRemain);
}
}

// Reset password attempts
passwordAttemtRemain = FLAGS_failed_login_attempts;
lockedSince = 0;
return Status::OK();
}

bool MetaClient::checkShadowAccountFromCache(const std::string& account) {
Expand Down Expand Up @@ -3532,6 +3649,21 @@ folly::Future<StatusOr<cpp2::ExecResp>> MetaClient::killQuery(
return future;
}

folly::Future<StatusOr<int64_t>> MetaClient::getWorkerId(std::string ipAddr) {
cpp2::GetWorkerIdReq req;
req.host_ref() = std::move(ipAddr);

folly::Promise<StatusOr<int64_t>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_getWorkerId(request); },
[](cpp2::GetWorkerIdResp&& resp) -> int64_t { return std::move(resp).get_workerid(); },
std::move(promise),
true);
return future;
}

folly::Future<StatusOr<bool>> MetaClient::download(const std::string& hdfsHost,
int32_t hdfsPort,
const std::string& hdfsPath,
Expand Down
21 changes: 17 additions & 4 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <gtest/gtest_prod.h>

#include <atomic>
#include <cstdint>

#include "common/base/Base.h"
#include "common/base/ObjectPool.h"
Expand Down Expand Up @@ -143,6 +144,10 @@ using IndexStatus = std::tuple<std::string, std::string, std::string>;
using UserRolesMap = std::unordered_map<std::string, std::vector<cpp2::RoleItem>>;
// get user password by account
using UserPasswordMap = std::unordered_map<std::string, std::string>;
// Mapping of user name and remaining wrong password attempts
using UserPasswordAttemptsRemain = std::unordered_map<std::string, uint32>;
// Mapping of user name and the timestamp when the account is locked
using UserLoginLockTime = std::unordered_map<std::string, uint32>;

// config cache, get config via module and name
using MetaConfigMap =
Expand Down Expand Up @@ -199,13 +204,13 @@ struct MetaClientOptions {
std::string serviceName_ = "";
// Whether to skip the config manager
bool skipConfig_ = false;
// host role(graph/meta/storage) using this client
// Host role(graph/meta/storage) using this client
cpp2::HostRole role_ = cpp2::HostRole::UNKNOWN;
// gitInfoSHA of Host using this client
std::string gitInfoSHA_{""};
// data path list, used in storaged
// Data path list, used in storaged
std::vector<std::string> dataPaths_;
// install path, used in metad/graphd/storaged
// Install path, used in metad/graphd/storaged
std::string rootPath_;
};

Expand Down Expand Up @@ -593,7 +598,7 @@ class MetaClient {

std::vector<cpp2::RoleItem> getRolesByUserFromCache(const std::string& user);

bool authCheckFromCache(const std::string& account, const std::string& password);
Status authCheckFromCache(const std::string& account, const std::string& password);

StatusOr<TermID> getTermFromCache(GraphSpaceID spaceId, PartitionID);

Expand Down Expand Up @@ -651,6 +656,8 @@ class MetaClient {

folly::Future<StatusOr<bool>> ingest(GraphSpaceID spaceId);

folly::Future<StatusOr<int64_t>> getWorkerId(std::string ipAddr);

HostAddr getMetaLeader() {
return leader_;
}
Expand All @@ -659,6 +666,10 @@ class MetaClient {
return heartbeatTime_;
}

std::string getLocalIp() {
return options_.localHost_.toString();
}

protected:
// Return true if load succeeded.
bool loadData();
Expand Down Expand Up @@ -815,6 +826,8 @@ class MetaClient {

UserRolesMap userRolesMap_;
UserPasswordMap userPasswordMap_;
UserPasswordAttemptsRemain userPasswordAttemptsRemain_;
UserLoginLockTime userLoginLockTime_;

NameIndexMap tagNameIndexMap_;
NameIndexMap edgeNameIndexMap_;
Expand Down
2 changes: 2 additions & 0 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> StorageClient::lookupIndex(
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
std::vector<storage::cpp2::OrderBy> orderBy,
int64_t limit) {
// TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer.
auto space = param.space;
Expand Down Expand Up @@ -516,6 +517,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> StorageClient::lookupIndex(
req.indices_ref() = spec;
req.common_ref() = common;
req.limit_ref() = limit;
req.order_by_ref() = orderBy;
}

return collectResponse(param.evb,
Expand Down
1 change: 1 addition & 0 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class StorageClient
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
std::vector<storage::cpp2::OrderBy> orderBy,
int64_t limit);

StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
Expand Down
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ nebula_add_subdirectory(utils)
nebula_add_subdirectory(ssl)
nebula_add_subdirectory(geo)
nebula_add_subdirectory(memory)
nebula_add_subdirectory(id)
6 changes: 2 additions & 4 deletions src/common/datatypes/Duration.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,8 @@ struct Duration {
}

std::string toString() const {
std::stringstream ss;
ss << "P" << months << "M" << days() << "D"
<< "T" << seconds << "S";
return ss.str();
return folly::sformat(
"P{}MT{}.{:0>6}000S", months, seconds + microseconds / 1000000, microseconds % 1000000);
}

folly::dynamic toJson() const {
Expand Down
21 changes: 21 additions & 0 deletions src/common/datatypes/test/ValueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,27 @@ TEST(Value, Ctor) {
// Value v2(&tmp);
}

TEST(Value, ToString) {
{
Duration d;
d.addYears(1);
d.addMonths(2);
d.addDays(500);
d.addSeconds(10);
d.addMicroseconds(20);
EXPECT_EQ(d.toString(), "P14MT43200010.000020000S");
}
{
Duration d;
d.addYears(1);
d.addMonths(2);
d.addDays(500);
d.addSeconds(10);
d.addMicroseconds(20000000);
EXPECT_EQ(d.toString(), "P14MT43200030.000000000S");
}
}

} // namespace nebula

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit 7aa7944

Please sign in to comment.