Skip to content

Commit

Permalink
Limit number of sessions that are created by same client ip and same …
Browse files Browse the repository at this point in the history
…user for each graphd (#3729)

* add session_tool command to manage and analysis sessions

* add session_tool command to manage and analysis sessions

* add max_sessions_per_ip_per_user

* odify judgment conditions

* remove other pr content

* modify code

* fix the format reported

* fix the format reported

* fix the format reported

* fix the format for all files

* change python test case

* change python test case

* change python test case

* delete spaces at last row of file

* Fix Python Test Case session limit problem

Co-authored-by: Yichen Wang <[email protected]>
Co-authored-by: cpw <[email protected]>
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
4 people authored Mar 16, 2022
1 parent 9a401f7 commit 3161f01
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 4 deletions.
4 changes: 4 additions & 0 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,7 @@
########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false

########## session ##########
# Maximum number of sessions that can be created per IP and per user
--max_sessions_per_ip_per_user=300
3 changes: 2 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"session_idle_timeout_secs",
"session_reclaim_interval_secs",
"max_allowed_connections",
"disable_octal_escape_char"
"disable_octal_escape_char",
"max_sessions_per_ip_per_user"
],
"NESTED": [
"rocksdb_db_options",
Expand Down
20 changes: 20 additions & 0 deletions src/common/session/SessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@

namespace nebula {

class SessionCount {
private:
std::atomic<int32_t> count_ = 1;

public:
int fetch_add(int step) {
count_.fetch_add(step, std::memory_order_release);
return count_;
}
int fetch_sub(int step) {
count_.fetch_sub(step, std::memory_order_release);
return count_;
}
int get() {
return count_;
}
};

template <class SessionType>
class SessionManager {
public:
Expand Down Expand Up @@ -56,7 +74,9 @@ class SessionManager {

protected:
using SessionPtr = std::shared_ptr<SessionType>;
using SessionCountPtr = std::shared_ptr<SessionCount>;
folly::ConcurrentHashMap<SessionID, SessionPtr> activeSessions_;
folly::ConcurrentHashMap<std::string, SessionCountPtr> userIpSessionCount_;
std::unique_ptr<thread::GenericWorker> scavenger_;
meta::MetaClient* metaClient_{nullptr};
HostAddr myAddr_;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ DEFINE_bool(disable_octal_escape_char,
DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");

DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory");
DEFINE_int32(max_sessions_per_ip_per_user,
300,
"Maximum number of sessions that can be created per IP and per user");

// Sanity-checking Flag Values
static bool ValidateSessIdleTimeout(const char* flagname, int32_t value) {
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ DECLARE_bool(enable_authorize);
DECLARE_string(auth_type);
DECLARE_string(cloud_http_url);
DECLARE_uint32(max_allowed_statements);
DECLARE_int32(max_sessions_per_ip_per_user);

// Failed login attempt
// value of failed_login_attempts is in the range from 0 to 32767.
Expand Down
69 changes: 67 additions & 2 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::fin
if (!ret.second) {
return Status::Error("Insert session to local cache failed.");
}
std::string key = session.get_user_name() + session.get_client_ip();
bool addResp = addSessionCount(key);
if (!addResp) {
return Status::Error("Insert userIpSessionCount to local cache failed.");
}

// update the space info to sessionPtr
if (!spaceName.empty()) {
Expand All @@ -108,8 +113,22 @@ std::vector<meta::cpp2::Session> GraphSessionManager::getSessionFromLocalCache()

folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::createSession(
const std::string userName, const std::string clientIp, folly::Executor* runner) {
auto createCB = [this,
userName = userName](auto&& resp) -> StatusOr<std::shared_ptr<ClientSession>> {
// check the number of sessions per user per ip
std::string key = userName + clientIp;
auto maxSessions = FLAGS_max_sessions_per_ip_per_user;
auto uiscFindPtr = userIpSessionCount_.find(key);
if (uiscFindPtr != userIpSessionCount_.end() && maxSessions > 0 &&
uiscFindPtr->second.get()->get() > maxSessions - 1) {
return Status::Error(
"Create Session failed: Too many sessions created from %s by user %s. "
"the threshold is %d. You can change it by modifying '%s' in nebula-graphd.conf",
clientIp.c_str(),
userName.c_str(),
maxSessions,
"max_sessions_per_ip_per_user");
}
auto createCB = [this, userName = userName, clientIp = clientIp](
auto&& resp) -> StatusOr<std::shared_ptr<ClientSession>> {
if (!resp.ok()) {
LOG(ERROR) << "Create session failed:" << resp.status();
return Status::Error("Create session failed: %s", resp.status().toString().c_str());
Expand All @@ -127,6 +146,11 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::cre
if (!ret.second) {
return Status::Error("Insert session to local cache failed.");
}
std::string sessionKey = userName + clientIp;
bool addResp = addSessionCount(sessionKey);
if (!addResp) {
return Status::Error("Insert userIpSessionCount to local cache failed.");
}
updateSessionInfo(sessionPtr.get());
return sessionPtr;
}
Expand All @@ -153,7 +177,11 @@ void GraphSessionManager::removeSession(SessionID id) {
LOG(ERROR) << "Remove session `" << id << "' failed: " << resp.status();
return;
}
auto sessionCopy = iter->second->getSession();
std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip();
activeSessions_.erase(iter);
// delete session count from cache
subSessionCount(key);
}

void GraphSessionManager::threadFunc() {
Expand Down Expand Up @@ -195,10 +223,14 @@ void GraphSessionManager::reclaimExpiredSessions() {
// TODO: Handle cases where the delete client failed
LOG(ERROR) << "Remove session `" << iter->first << "' failed: " << resp.status();
}
auto sessionCopy = iter->second->getSession();
std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip();
iter = activeSessions_.erase(iter);
stats::StatsManager::decValue(kNumActiveSessions);
stats::StatsManager::addValue(kNumReclaimedExpiredSessions);
// TODO: Disconnect the connection of the session
// delete session count from cache
subSessionCount(key);
}
}

Expand Down Expand Up @@ -265,6 +297,7 @@ Status GraphSessionManager::init() {
if (!listSessionsRet.ok()) {
return Status::Error("Load sessions from meta failed.");
}
int64_t loadSessionCount = 0;
auto& sessions = *listSessionsRet.value().sessions_ref();
for (auto& session : sessions) {
if (session.get_graph_addr() != myAddr_) {
Expand All @@ -281,14 +314,46 @@ Status GraphSessionManager::init() {
continue;
}
session.queries_ref()->clear();
std::string key = session.get_user_name() + session.get_client_ip();
auto sessionPtr = ClientSession::create(std::move(session), metaClient_);
auto ret = activeSessions_.emplace(sessionId, sessionPtr);
if (!ret.second) {
return Status::Error("Insert session to local cache failed.");
}
bool addResp = addSessionCount(key);
if (!addResp) {
return Status::Error("Insert userIpSessionCount to local cache failed.");
}
updateSessionInfo(sessionPtr.get());
loadSessionCount++;
}
LOG(INFO) << "Total of " << loadSessionCount << " sessions are loaded";
return Status::OK();
}

bool GraphSessionManager::addSessionCount(std::string& key) {
auto countFindPtr = userIpSessionCount_.find(key);
if (countFindPtr != userIpSessionCount_.end()) {
countFindPtr->second.get()->fetch_add(1);
} else {
auto ret1 = userIpSessionCount_.emplace(key, std::make_shared<SessionCount>());
if (!ret1.second) {
return false;
}
}
return true;
}

bool GraphSessionManager::subSessionCount(std::string& key) {
auto countFindPtr = userIpSessionCount_.find(key);
if (countFindPtr == userIpSessionCount_.end()) {
return false;
}
auto count = countFindPtr->second.get()->fetch_sub(1);
if (count <= 0) {
userIpSessionCount_.erase(countFindPtr);
}
return true;
}
} // namespace graph
} // namespace nebula
6 changes: 6 additions & 0 deletions src/graph/session/GraphSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ class GraphSessionManager final : public SessionManager<ClientSession> {
// Updates session info locally.
// session: ClientSession which will be updated.
void updateSessionInfo(ClientSession* session);

// add sessionCount
bool addSessionCount(std::string& key);

// sub sessionCount
bool subSessionCount(std::string& key);
};

} // namespace graph
Expand Down
1 change: 1 addition & 0 deletions tests/admin/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def test_configs(self):
['GRAPH', 'session_reclaim_interval_secs', 'int', 'MUTABLE', 2],
['GRAPH', 'max_allowed_connections', 'int', 'MUTABLE', 9223372036854775807],
['GRAPH', 'disable_octal_escape_char', 'bool', 'MUTABLE', False],
['GRAPH', 'max_sessions_per_ip_per_user', 'int', 'MUTABLE', 300],
]
self.check_out_of_order_result(resp, expected_result)

Expand Down
60 changes: 59 additions & 1 deletion tests/job/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ def prepare(self):
resp = self.execute(query)
self.check_resp_succeeded(resp)

resp = self.execute('CREATE USER IF NOT EXISTS session_user1 WITH PASSWORD "123456"')
self.check_resp_succeeded(resp)

query = 'GRANT ROLE ADMIN ON nba TO session_user1'
resp = self.execute(query)
self.check_resp_succeeded(resp)

resp = self.execute('CREATE USER IF NOT EXISTS session_user2 WITH PASSWORD "123456"')
self.check_resp_succeeded(resp)

query = 'GRANT ROLE ADMIN ON nba TO session_user2'
resp = self.execute(query)
self.check_resp_succeeded(resp)

resp = self.execute('SHOW HOSTS GRAPH')
self.check_resp_succeeded(resp)
assert not resp.is_empty()
Expand All @@ -53,7 +67,10 @@ def cleanup(self):
session = self.client_pool.get_session('root', 'nebula')
resp = session.execute('DROP USER session_user')
self.check_resp_succeeded(resp)

resp = session.execute('DROP USER session_user1')
self.check_resp_succeeded(resp)
resp = session.execute('DROP USER session_user2')
self.check_resp_succeeded(resp)
def test_sessions(self):
# 1: test add session with right username
try:
Expand Down Expand Up @@ -225,3 +242,44 @@ def test_signout_and_execute(self):
resp = conn.execute(session_id, 'SHOW HOSTS')
assert resp.error_code == ttypes.ErrorCode.E_SESSION_INVALID, resp.error_msg
assert resp.error_msg.find(b'Session not existed!') > 0

def test_out_of_max_sessions_per_ip_per_user(self):
resp = self.execute('SHOW SESSIONS')
self.check_resp_succeeded(resp)
sessions = len(resp.rows())

i = 0
session_user1_count = 0
while (i < sessions):
row = resp.rows()[i]
if (row.values[1].get_sVal() == b'session_user1'):
session_user1_count += 1
i += 1

session_limit = max(3, sessions)
resp = self.execute('UPDATE CONFIGS graph:max_sessions_per_ip_per_user = {}'.format(session_limit))
self.check_resp_succeeded(resp)
time.sleep(3)

# get new session failed for user session_user1
try:
i = 0
while (i < session_limit - session_user1_count):
self.client_pool.get_session("session_user1", "123456")
i += 1
self.client_pool.get_session("session_user1", "123456")
assert False
except Exception as e:
assert True

# get new session success from session_user2
try:
self.client_pool.get_session("session_user2", "123456")
self.check_resp_succeeded(resp)
assert True
except Exception as e:
assert False, e.message

resp = self.execute('UPDATE CONFIGS graph:max_sessions_per_ip_per_user = 300')
self.check_resp_succeeded(resp)
time.sleep(3)

0 comments on commit 3161f01

Please sign in to comment.