Skip to content

Commit

Permalink
Add show local sessions command (vesoft-inc#415)
Browse files Browse the repository at this point in the history
* Add session context definition

* Add local seesion command

* Add show local queries command and depracate show all queries

* Fix ut

* Fix pytest

* Add tck case

* Save multiple sessions in the cluster fixture in TCK

* Add tck case

* Format

Co-authored-by: Yichen Wang <[email protected]>
  • Loading branch information
nebula-bots and Aiee authored Jan 4, 2022
1 parent f0a572c commit 8fb64ff
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 80 deletions.
65 changes: 43 additions & 22 deletions src/graph/executor/admin/SessionExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ folly::Future<Status> ShowSessionsExecutor::execute() {
auto *showNode = asNode<ShowSessions>(node());
if (showNode->isSetSessionID()) {
return getSession(showNode->getSessionId());
} else {
return listSessions();
}
return showNode->isLocalCommand() ? listLocalSessions() : listSessions();
}

folly::Future<Status> ShowSessionsExecutor::listSessions() {
Expand All @@ -31,6 +30,7 @@ folly::Future<Status> ShowSessionsExecutor::listSessions() {
return Status::Error("Show sessions failed: %s.", resp.status().toString().c_str());
}
auto sessions = resp.value().get_sessions();
// Construct result column names
DataSet result({"SessionId",
"UserName",
"SpaceName",
Expand All @@ -40,21 +40,28 @@ folly::Future<Status> ShowSessionsExecutor::listSessions() {
"Timezone",
"ClientIp"});
for (auto &session : sessions) {
Row row;
row.emplace_back(session.get_session_id());
row.emplace_back(session.get_user_name());
row.emplace_back(session.get_space_name());
row.emplace_back(microSecToDateTime(session.get_create_time()));
row.emplace_back(microSecToDateTime(session.get_update_time()));
row.emplace_back(network::NetworkUtils::toHostsStr({session.get_graph_addr()}));
row.emplace_back(session.get_timezone());
row.emplace_back(session.get_client_ip());
result.emplace_back(std::move(row));
addSessions(session, result);
}
return finish(ResultBuilder().value(Value(std::move(result))).build());
});
}

folly::Future<Status> ShowSessionsExecutor::listLocalSessions() {
auto localSessions = qctx_->rctx()->sessionMgr()->getSessionFromLocalCache();
DataSet result({"SessionId",
"UserName",
"SpaceName",
"CreateTime",
"UpdateTime",
"GraphAddr",
"Timezone",
"ClientIp"});
for (auto &session : localSessions) {
addSessions(session, result);
}
return finish(ResultBuilder().value(Value(std::move(result))).build());
}

folly::Future<Status> ShowSessionsExecutor::getSession(SessionID sessionId) {
return qctx()->getMetaClient()->getSession(sessionId).via(runner()).thenValue(
[this, sessionId](StatusOr<meta::cpp2::GetSessionResp> resp) {
Expand All @@ -64,20 +71,34 @@ folly::Future<Status> ShowSessionsExecutor::getSession(SessionID sessionId) {
"Get session `%ld' failed: %s.", sessionId, resp.status().toString().c_str());
}
auto session = resp.value().get_session();
DataSet result({"VariableName", "Value"});
result.emplace_back(Row({"SessionID", session.get_session_id()}));
result.emplace_back(Row({"UserName", session.get_user_name()}));
result.emplace_back(Row({"SpaceName", session.get_space_name()}));
result.emplace_back(Row({"CreateTime", microSecToDateTime(session.get_create_time())}));
result.emplace_back(Row({"UpdateTime", microSecToDateTime(session.get_update_time())}));
result.emplace_back(
Row({"GraphAddr", network::NetworkUtils::toHostsStr({session.get_graph_addr()})}));
result.emplace_back(Row({"Timezone", session.get_timezone()}));
result.emplace_back(Row({"ClientIp", session.get_client_ip()}));

// Construct result column names
DataSet result({"SessionId",
"UserName",
"SpaceName",
"CreateTime",
"UpdateTime",
"GraphAddr",
"Timezone",
"ClientIp"});
addSessions(session, result);
return finish(ResultBuilder().value(Value(std::move(result))).build());
});
}

void ShowSessionsExecutor::addSessions(const meta::cpp2::Session &session, DataSet &dataSet) const {
Row row;
row.emplace_back(session.get_session_id());
row.emplace_back(session.get_user_name());
row.emplace_back(session.get_space_name());
row.emplace_back(microSecToDateTime(session.get_create_time()));
row.emplace_back(microSecToDateTime(session.get_update_time()));
row.emplace_back(network::NetworkUtils::toHostsStr({session.get_graph_addr()}));
row.emplace_back(session.get_timezone());
row.emplace_back(session.get_client_ip());
dataSet.emplace_back(std::move(row));
}

folly::Future<Status> UpdateSessionExecutor::execute() {
VLOG(1) << "Update sessions to metad";
SCOPED_TIMER(&execTime_);
Expand Down
7 changes: 6 additions & 1 deletion src/graph/executor/admin/SessionExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ class ShowSessionsExecutor final : public Executor {
folly::Future<Status> execute() override;

private:
// List sessions in the cluster
folly::Future<Status> listSessions();
// List sessions in the current graph node
folly::Future<Status> listLocalSessions();

folly::Future<Status> getSession(SessionID sessionId);
// Add session info into dataset
void addSessions(const meta::cpp2::Session &session, DataSet &dataSet) const;

DateTime microSecToDateTime(int64_t microSec) {
DateTime microSecToDateTime(const int64_t microSec) const {
auto dateTime = time::TimeConversion::unixSecondsToDateTime(microSec / 1000000);
dateTime.microsec = microSec % 1000000;
return dateTime;
Expand Down
19 changes: 13 additions & 6 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -1325,14 +1325,18 @@ class ShowSessions final : public SingleInputNode {
static ShowSessions* make(QueryContext* qctx,
PlanNode* input,
bool isSetSessionID,
SessionID sessionId) {
return qctx->objPool()->add(new ShowSessions(qctx, input, isSetSessionID, sessionId));
SessionID sessionId,
bool isLocalCommand) {
return qctx->objPool()->add(
new ShowSessions(qctx, input, isSetSessionID, sessionId, isLocalCommand));
}

bool isSetSessionID() const {
return isSetSessionID_;
}

bool isLocalCommand() const {
return isLocalCommand_;
}
SessionID getSessionId() const {
return sessionId_;
}
Expand All @@ -1341,15 +1345,18 @@ class ShowSessions final : public SingleInputNode {
explicit ShowSessions(QueryContext* qctx,
PlanNode* input,
bool isSetSessionID,
SessionID sessionId)
SessionID sessionId,
bool isLocalCommand)
: SingleInputNode(qctx, Kind::kShowSessions, input) {
isSetSessionID_ = isSetSessionID;
sessionId_ = sessionId;
isSetSessionID_ = isSetSessionID;
isLocalCommand_ = isLocalCommand;
}

private:
bool isSetSessionID_{false};
SessionID sessionId_{-1};
bool isSetSessionID_{false};
bool isLocalCommand_{false};
};

class UpdateSession final : public SingleInputNode {
Expand Down
9 changes: 9 additions & 0 deletions src/graph/session/GraphSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::fin
return metaClient_->getSession(id).via(runner).thenValue(addSession);
}

std::vector<meta::cpp2::Session> GraphSessionManager::getSessionFromLocalCache() const {
std::vector<meta::cpp2::Session> sessions;
sessions.reserve(activeSessions_.size());
for (auto& it : activeSessions_) {
sessions.emplace_back(it.second->getSession());
}
return sessions;
}

folly::Future<StatusOr<std::shared_ptr<ClientSession>>> GraphSessionManager::createSession(
const std::string userName, const std::string clientIp, folly::Executor* runner) {
auto createCB = [this,
Expand Down
4 changes: 4 additions & 0 deletions src/graph/session/GraphSessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "common/thrift/ThriftTypes.h"
#include "graph/session/ClientSession.h"
#include "interface/gen-cpp2/GraphService.h"
#include "interface/gen-cpp2/meta_types.h"

/**
* GraphSessionManager manages the client sessions, e.g. create new, find
Expand Down Expand Up @@ -60,6 +61,9 @@ class GraphSessionManager final : public SessionManager<ClientSession> {
*/
std::shared_ptr<ClientSession> findSessionFromCache(SessionID id);

// get all seesions in the local cache
std::vector<meta::cpp2::Session> getSessionFromLocalCache() const;

private:
folly::Future<StatusOr<std::shared_ptr<ClientSession>>> findSessionFromMetad(
SessionID id, folly::Executor* runner);
Expand Down
7 changes: 5 additions & 2 deletions src/graph/validator/AdminValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,11 @@ Status ShowSpaceServiceClientsValidator::toPlan() {

Status ShowSessionsValidator::toPlan() {
auto sentence = static_cast<ShowSessionsSentence *>(sentence_);
auto *node =
ShowSessions::make(qctx_, nullptr, sentence->isSetSessionID(), sentence->getSessionID());
auto *node = ShowSessions::make(qctx_,
nullptr,
sentence->isSetSessionID(),
sentence->getSessionID(),
sentence->isLocalCommand());
root_ = node;
tail_ = root_;
return Status::OK();
Expand Down
5 changes: 3 additions & 2 deletions src/parser/AdminSentences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,13 +510,14 @@ std::string ShowSessionsSentence::toString() const {
if (isSetSessionID()) {
return folly::stringPrintf("SHOW SESSION %ld", sessionId_);
}
if (isLocalCommand()) return "SHOW LOCAL SESSIONS";
return "SHOW SESSIONS";
}

std::string ShowQueriesSentence::toString() const {
std::string buf = "SHOW";
if (isAll()) {
buf += " ALL";
if (!isAll()) {
buf += " LOCAL";
}
buf += " QUERIES";
return buf;
Expand Down
10 changes: 10 additions & 0 deletions src/parser/AdminSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,19 @@ class ShowSessionsSentence final : public Sentence {
setSessionId_ = true;
}

explicit ShowSessionsSentence(bool isLocalCommand) {
kind_ = Kind::kShowSessions;
isLocalCommand_ = isLocalCommand;
}

bool isSetSessionID() const {
return setSessionId_;
}

bool isLocalCommand() const {
return isLocalCommand_;
}

SessionID getSessionID() const {
return sessionId_;
}
Expand All @@ -835,6 +844,7 @@ class ShowSessionsSentence final : public Sentence {
private:
SessionID sessionId_{0};
bool setSessionId_{false};
bool isLocalCommand_{false};
};

class ShowQueriesSentence final : public Sentence {
Expand Down
11 changes: 9 additions & 2 deletions src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token KW_TEXT KW_SEARCH KW_CLIENTS KW_SIGN KW_SERVICE KW_TEXT_SEARCH
%token KW_ANY KW_SINGLE KW_NONE
%token KW_REDUCE
%token KW_LOCAL
%token KW_SESSIONS KW_SESSION
%token KW_KILL KW_QUERY KW_QUERIES KW_TOP
%token KW_GEOGRAPHY KW_POINT KW_LINESTRING KW_POLYGON
Expand Down Expand Up @@ -546,6 +547,7 @@ unreserved_keyword
| KW_S2_MAX_CELLS { $$ = new std::string("s2_max_cells"); }
| KW_SESSION { $$ = new std::string("session"); }
| KW_SESSIONS { $$ = new std::string("sessions"); }
| KW_LOCAL { $$ = new std::string("local"); }
| KW_SAMPLE { $$ = new std::string("sample"); }
| KW_QUERIES { $$ = new std::string("queries"); }
| KW_QUERY { $$ = new std::string("query"); }
Expand Down Expand Up @@ -3317,10 +3319,10 @@ job_concurrency
;

show_queries_sentence
: KW_SHOW KW_QUERIES {
: KW_SHOW KW_LOCAL KW_QUERIES {
$$ = new ShowQueriesSentence();
}
| KW_SHOW KW_ALL KW_QUERIES {
| KW_SHOW KW_QUERIES {
$$ = new ShowQueriesSentence(true);
}
;
Expand Down Expand Up @@ -3418,9 +3420,14 @@ show_sentence
| KW_SHOW KW_FULLTEXT KW_INDEXES {
$$ = new ShowFTIndexesSentence();
}
// List sessions in the cluster
| KW_SHOW KW_SESSIONS {
$$ = new ShowSessionsSentence();
}
// List sessions in the current graph node
| KW_SHOW KW_LOCAL KW_SESSIONS {
$$ = new ShowSessionsSentence(true);
}
| KW_SHOW KW_SESSION legal_integer {
$$ = new ShowSessionsSentence($3);
}
Expand Down
3 changes: 2 additions & 1 deletion src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}*
"COMMENT" { return TokenType::KW_COMMENT; }
"S2_MAX_LEVEL" { return TokenType::KW_S2_MAX_LEVEL; }
"S2_MAX_CELLS" { return TokenType::KW_S2_MAX_CELLS; }
"LOCAL" { return TokenType::KW_LOCAL; }
"SESSIONS" { return TokenType::KW_SESSIONS; }
"SESSION" { return TokenType::KW_SESSION; }
"SAMPLE" { return TokenType::KW_SAMPLE; }
Expand Down Expand Up @@ -518,7 +519,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}*
* including the non-ascii ones, which are negative
* in terms of type of `signed char'. At the same time, because
* Bison translates all negative tokens to EOF(i.e. YY_NULL),
* so we have to cast illegal characters to type of `unsinged char'
* so we have to cast illegal characters to type of `unsigned char'
* This will make Bison receive an unknown token, which leads to
* a syntax error.
*
Expand Down
10 changes: 8 additions & 2 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3253,6 +3253,12 @@ TEST_F(ParserTest, SessionTest) {
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), "SHOW SESSIONS");
}
{
std::string query = "SHOW LOCAL SESSIONS";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), "SHOW LOCAL SESSIONS");
}
{
std::string query = "SHOW SESSION 123";
auto result = parse(query);
Expand Down Expand Up @@ -3301,10 +3307,10 @@ TEST_F(ParserTest, ShowAndKillQueryTest) {
ASSERT_EQ(result.value()->toString(), "SHOW QUERIES");
}
{
std::string query = "SHOW ALL QUERIES";
std::string query = "SHOW LOCAL QUERIES";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
ASSERT_EQ(result.value()->toString(), "SHOW ALL QUERIES");
ASSERT_EQ(result.value()->toString(), "SHOW LOCAL QUERIES");
}
{
std::string query = "KILL QUERY (plan=123)";
Expand Down
14 changes: 9 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,18 @@ def class_fixture_variables():
"""save class scope fixture, used for session update.
"""
# cluster is the instance of NebulaService
# current_session is the session currently using
# sessions is a list of all sessions in the cluster
res = dict(
pool=None,
session=None,
current_session=None,
cluster=None,
sessions=[],
)
yield res
if res["session"] is not None:
res["session"].release()
for sess in res["sessions"]:
if sess is not None:
sess.release()
if res["pool"] is not None:
res["pool"].close()
if res["cluster"] is not None:
Expand Down Expand Up @@ -176,8 +180,8 @@ def session_from_second_conn_pool(conn_pool_to_second_graph_service, pytestconfi

@pytest.fixture(scope="class")
def session(session_from_first_conn_pool, class_fixture_variables):
if class_fixture_variables.get('session', None) is not None:
return class_fixture_variables.get('session')
if class_fixture_variables.get('current_session', None) is not None:
return class_fixture_variables.get('current_session')
return session_from_first_conn_pool


Expand Down
Loading

0 comments on commit 8fb64ff

Please sign in to comment.