From 3161f0109246c121d5f39caef0094648dfb8aa07 Mon Sep 17 00:00:00 2001 From: Liang Feng <516072575@qq.com> Date: Wed, 16 Mar 2022 11:20:53 +0800 Subject: [PATCH] Limit number of sessions that are created by same client ip and same user for each graphd (#3729) * add session_tool command to manage and analysis sessions * add session_tool command to manage and analysis sessions * add max_sessions_per_ip_per_user * odify judgment conditions * remove other pr content * modify code * fix the format reported * fix the format reported * fix the format reported * fix the format for all files * change python test case * change python test case * change python test case * delete spaces at last row of file * Fix Python Test Case session limit problem Co-authored-by: Yichen Wang <18348405+Aiee@users.noreply.github.com> Co-authored-by: cpw <13495049+CPWstatic@users.noreply.github.com> Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- conf/nebula-graphd.conf.production | 4 ++ resources/gflags.json | 3 +- src/common/session/SessionManager.h | 20 +++++++ src/graph/service/GraphFlags.cpp | 3 + src/graph/service/GraphFlags.h | 1 + src/graph/session/GraphSessionManager.cpp | 69 ++++++++++++++++++++++- src/graph/session/GraphSessionManager.h | 6 ++ tests/admin/test_configs.py | 1 + tests/job/test_session.py | 60 +++++++++++++++++++- 9 files changed, 163 insertions(+), 4 deletions(-) diff --git a/conf/nebula-graphd.conf.production b/conf/nebula-graphd.conf.production index 3fb6f70d81c..4e03aa23925 100644 --- a/conf/nebula-graphd.conf.production +++ b/conf/nebula-graphd.conf.production @@ -87,3 +87,7 @@ ########## experimental feature ########## # if use experimental features --enable_experimental_feature=false + +########## session ########## +# Maximum number of sessions that can be created per IP and per user +--max_sessions_per_ip_per_user=300 diff --git a/resources/gflags.json b/resources/gflags.json index 52fd60b2786..6a0bcd8fb69 100644 --- a/resources/gflags.json +++ b/resources/gflags.json @@ -14,7 +14,8 @@ "session_idle_timeout_secs", "session_reclaim_interval_secs", "max_allowed_connections", - "disable_octal_escape_char" + "disable_octal_escape_char", + "max_sessions_per_ip_per_user" ], "NESTED": [ "rocksdb_db_options", diff --git a/src/common/session/SessionManager.h b/src/common/session/SessionManager.h index 4b5c06b50b9..00ef5fe63ce 100644 --- a/src/common/session/SessionManager.h +++ b/src/common/session/SessionManager.h @@ -21,6 +21,24 @@ namespace nebula { +class SessionCount { + private: + std::atomic count_ = 1; + + public: + int fetch_add(int step) { + count_.fetch_add(step, std::memory_order_release); + return count_; + } + int fetch_sub(int step) { + count_.fetch_sub(step, std::memory_order_release); + return count_; + } + int get() { + return count_; + } +}; + template class SessionManager { public: @@ -56,7 +74,9 @@ class SessionManager { protected: using SessionPtr = std::shared_ptr; + using SessionCountPtr = std::shared_ptr; folly::ConcurrentHashMap activeSessions_; + folly::ConcurrentHashMap userIpSessionCount_; std::unique_ptr scavenger_; meta::MetaClient* metaClient_{nullptr}; HostAddr myAddr_; diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index d805e2dc7be..6532a480700 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -71,6 +71,9 @@ DEFINE_bool(disable_octal_escape_char, DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature"); DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory"); +DEFINE_int32(max_sessions_per_ip_per_user, + 300, + "Maximum number of sessions that can be created per IP and per user"); // Sanity-checking Flag Values static bool ValidateSessIdleTimeout(const char* flagname, int32_t value) { diff --git a/src/graph/service/GraphFlags.h b/src/graph/service/GraphFlags.h index 22d8e433ddd..2f7bad152be 100644 --- a/src/graph/service/GraphFlags.h +++ b/src/graph/service/GraphFlags.h @@ -36,6 +36,7 @@ DECLARE_bool(enable_authorize); DECLARE_string(auth_type); DECLARE_string(cloud_http_url); DECLARE_uint32(max_allowed_statements); +DECLARE_int32(max_sessions_per_ip_per_user); // Failed login attempt // value of failed_login_attempts is in the range from 0 to 32767. diff --git a/src/graph/session/GraphSessionManager.cpp b/src/graph/session/GraphSessionManager.cpp index 86038175b90..8ae85840c27 100644 --- a/src/graph/session/GraphSessionManager.cpp +++ b/src/graph/session/GraphSessionManager.cpp @@ -82,6 +82,11 @@ folly::Future>> GraphSessionManager::fin if (!ret.second) { return Status::Error("Insert session to local cache failed."); } + std::string key = session.get_user_name() + session.get_client_ip(); + bool addResp = addSessionCount(key); + if (!addResp) { + return Status::Error("Insert userIpSessionCount to local cache failed."); + } // update the space info to sessionPtr if (!spaceName.empty()) { @@ -108,8 +113,22 @@ std::vector GraphSessionManager::getSessionFromLocalCache() folly::Future>> GraphSessionManager::createSession( const std::string userName, const std::string clientIp, folly::Executor* runner) { - auto createCB = [this, - userName = userName](auto&& resp) -> StatusOr> { + // check the number of sessions per user per ip + std::string key = userName + clientIp; + auto maxSessions = FLAGS_max_sessions_per_ip_per_user; + auto uiscFindPtr = userIpSessionCount_.find(key); + if (uiscFindPtr != userIpSessionCount_.end() && maxSessions > 0 && + uiscFindPtr->second.get()->get() > maxSessions - 1) { + return Status::Error( + "Create Session failed: Too many sessions created from %s by user %s. " + "the threshold is %d. You can change it by modifying '%s' in nebula-graphd.conf", + clientIp.c_str(), + userName.c_str(), + maxSessions, + "max_sessions_per_ip_per_user"); + } + auto createCB = [this, userName = userName, clientIp = clientIp]( + auto&& resp) -> StatusOr> { if (!resp.ok()) { LOG(ERROR) << "Create session failed:" << resp.status(); return Status::Error("Create session failed: %s", resp.status().toString().c_str()); @@ -127,6 +146,11 @@ folly::Future>> GraphSessionManager::cre if (!ret.second) { return Status::Error("Insert session to local cache failed."); } + std::string sessionKey = userName + clientIp; + bool addResp = addSessionCount(sessionKey); + if (!addResp) { + return Status::Error("Insert userIpSessionCount to local cache failed."); + } updateSessionInfo(sessionPtr.get()); return sessionPtr; } @@ -153,7 +177,11 @@ void GraphSessionManager::removeSession(SessionID id) { LOG(ERROR) << "Remove session `" << id << "' failed: " << resp.status(); return; } + auto sessionCopy = iter->second->getSession(); + std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip(); activeSessions_.erase(iter); + // delete session count from cache + subSessionCount(key); } void GraphSessionManager::threadFunc() { @@ -195,10 +223,14 @@ void GraphSessionManager::reclaimExpiredSessions() { // TODO: Handle cases where the delete client failed LOG(ERROR) << "Remove session `" << iter->first << "' failed: " << resp.status(); } + auto sessionCopy = iter->second->getSession(); + std::string key = sessionCopy.get_user_name() + sessionCopy.get_client_ip(); iter = activeSessions_.erase(iter); stats::StatsManager::decValue(kNumActiveSessions); stats::StatsManager::addValue(kNumReclaimedExpiredSessions); // TODO: Disconnect the connection of the session + // delete session count from cache + subSessionCount(key); } } @@ -265,6 +297,7 @@ Status GraphSessionManager::init() { if (!listSessionsRet.ok()) { return Status::Error("Load sessions from meta failed."); } + int64_t loadSessionCount = 0; auto& sessions = *listSessionsRet.value().sessions_ref(); for (auto& session : sessions) { if (session.get_graph_addr() != myAddr_) { @@ -281,14 +314,46 @@ Status GraphSessionManager::init() { continue; } session.queries_ref()->clear(); + std::string key = session.get_user_name() + session.get_client_ip(); auto sessionPtr = ClientSession::create(std::move(session), metaClient_); auto ret = activeSessions_.emplace(sessionId, sessionPtr); if (!ret.second) { return Status::Error("Insert session to local cache failed."); } + bool addResp = addSessionCount(key); + if (!addResp) { + return Status::Error("Insert userIpSessionCount to local cache failed."); + } updateSessionInfo(sessionPtr.get()); + loadSessionCount++; } + LOG(INFO) << "Total of " << loadSessionCount << " sessions are loaded"; return Status::OK(); } + +bool GraphSessionManager::addSessionCount(std::string& key) { + auto countFindPtr = userIpSessionCount_.find(key); + if (countFindPtr != userIpSessionCount_.end()) { + countFindPtr->second.get()->fetch_add(1); + } else { + auto ret1 = userIpSessionCount_.emplace(key, std::make_shared()); + if (!ret1.second) { + return false; + } + } + return true; +} + +bool GraphSessionManager::subSessionCount(std::string& key) { + auto countFindPtr = userIpSessionCount_.find(key); + if (countFindPtr == userIpSessionCount_.end()) { + return false; + } + auto count = countFindPtr->second.get()->fetch_sub(1); + if (count <= 0) { + userIpSessionCount_.erase(countFindPtr); + } + return true; +} } // namespace graph } // namespace nebula diff --git a/src/graph/session/GraphSessionManager.h b/src/graph/session/GraphSessionManager.h index e3ea108f4bf..3c20dd48ab4 100644 --- a/src/graph/session/GraphSessionManager.h +++ b/src/graph/session/GraphSessionManager.h @@ -101,6 +101,12 @@ class GraphSessionManager final : public SessionManager { // Updates session info locally. // session: ClientSession which will be updated. void updateSessionInfo(ClientSession* session); + + // add sessionCount + bool addSessionCount(std::string& key); + + // sub sessionCount + bool subSessionCount(std::string& key); }; } // namespace graph diff --git a/tests/admin/test_configs.py b/tests/admin/test_configs.py index b9c1e4129f6..82fd1824c16 100644 --- a/tests/admin/test_configs.py +++ b/tests/admin/test_configs.py @@ -75,6 +75,7 @@ def test_configs(self): ['GRAPH', 'session_reclaim_interval_secs', 'int', 'MUTABLE', 2], ['GRAPH', 'max_allowed_connections', 'int', 'MUTABLE', 9223372036854775807], ['GRAPH', 'disable_octal_escape_char', 'bool', 'MUTABLE', False], + ['GRAPH', 'max_sessions_per_ip_per_user', 'int', 'MUTABLE', 300], ] self.check_out_of_order_result(resp, expected_result) diff --git a/tests/job/test_session.py b/tests/job/test_session.py index 5da0687c5bd..8aedbd418ce 100644 --- a/tests/job/test_session.py +++ b/tests/job/test_session.py @@ -31,6 +31,20 @@ def prepare(self): resp = self.execute(query) self.check_resp_succeeded(resp) + resp = self.execute('CREATE USER IF NOT EXISTS session_user1 WITH PASSWORD "123456"') + self.check_resp_succeeded(resp) + + query = 'GRANT ROLE ADMIN ON nba TO session_user1' + resp = self.execute(query) + self.check_resp_succeeded(resp) + + resp = self.execute('CREATE USER IF NOT EXISTS session_user2 WITH PASSWORD "123456"') + self.check_resp_succeeded(resp) + + query = 'GRANT ROLE ADMIN ON nba TO session_user2' + resp = self.execute(query) + self.check_resp_succeeded(resp) + resp = self.execute('SHOW HOSTS GRAPH') self.check_resp_succeeded(resp) assert not resp.is_empty() @@ -53,7 +67,10 @@ def cleanup(self): session = self.client_pool.get_session('root', 'nebula') resp = session.execute('DROP USER session_user') self.check_resp_succeeded(resp) - + resp = session.execute('DROP USER session_user1') + self.check_resp_succeeded(resp) + resp = session.execute('DROP USER session_user2') + self.check_resp_succeeded(resp) def test_sessions(self): # 1: test add session with right username try: @@ -225,3 +242,44 @@ def test_signout_and_execute(self): resp = conn.execute(session_id, 'SHOW HOSTS') assert resp.error_code == ttypes.ErrorCode.E_SESSION_INVALID, resp.error_msg assert resp.error_msg.find(b'Session not existed!') > 0 + + def test_out_of_max_sessions_per_ip_per_user(self): + resp = self.execute('SHOW SESSIONS') + self.check_resp_succeeded(resp) + sessions = len(resp.rows()) + + i = 0 + session_user1_count = 0 + while (i < sessions): + row = resp.rows()[i] + if (row.values[1].get_sVal() == b'session_user1'): + session_user1_count += 1 + i += 1 + + session_limit = max(3, sessions) + resp = self.execute('UPDATE CONFIGS graph:max_sessions_per_ip_per_user = {}'.format(session_limit)) + self.check_resp_succeeded(resp) + time.sleep(3) + + # get new session failed for user session_user1 + try: + i = 0 + while (i < session_limit - session_user1_count): + self.client_pool.get_session("session_user1", "123456") + i += 1 + self.client_pool.get_session("session_user1", "123456") + assert False + except Exception as e: + assert True + + # get new session success from session_user2 + try: + self.client_pool.get_session("session_user2", "123456") + self.check_resp_succeeded(resp) + assert True + except Exception as e: + assert False, e.message + + resp = self.execute('UPDATE CONFIGS graph:max_sessions_per_ip_per_user = 300') + self.check_resp_succeeded(resp) + time.sleep(3)