Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mo_sessions table function and view #11748

Merged
merged 16 commits into from
Sep 14, 2023
Merged
9 changes: 8 additions & 1 deletion pkg/cnservice/upgrader/newAddTable.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,11 @@ var STATISTICSView = &table.Table{
"join `mo_catalog`.`mo_columns` `tcl` on (`idx`.`table_id` = `tcl`.`att_relname_id` and `idx`.`column_name` = `tcl`.`attname`)",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView}
var MoSessionsView = &table.Table{
Account: table.AccountAll,
Database: sysview.InformationDBConst,
Table: "mo_sessions",
CreateViewSql: "CREATE VIEW IF NOT EXISTS `mo_catalog`.`mo_sessions` AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView, MoSessionsView}
4 changes: 4 additions & 0 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ var (
"mo_mysql_compatibility_mode": 0,
"mo_stages": 0,
catalog.MOAutoIncrTable: 0,
"mo_sessions": 0,
}
configInitVariables = map[string]int8{
"save_query_result": 0,
Expand All @@ -842,6 +843,7 @@ var (
"mo_table_partitions": 0,
"mo_pubs": 0,
"mo_stages": 0,
"mo_sessions": 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = fmt.Sprintf(`create table if not exists %s (
Expand Down Expand Up @@ -1016,6 +1018,7 @@ var (
comment text,
primary key(stage_id)
);`,
`CREATE VIEW IF NOT EXISTS mo_sessions AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;`,
}

//drop tables for the tenant
Expand All @@ -1029,6 +1032,7 @@ var (
`drop table if exists mo_catalog.mo_stored_procedure;`,
`drop table if exists mo_catalog.mo_mysql_compatibility_mode;`,
`drop table if exists mo_catalog.mo_stages;`,
`drop view if exists mo_catalog.mo_sessions;`,
}
dropMoPubsSql = `drop table if exists mo_catalog.mo_pubs;`
deleteMoPubsSql = `delete from mo_catalog.mo_pubs;`
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (cwft *TxnComputationWrapper) Compile(requestCtx context.Context, u interfa
// statement ID and updating snapshot TS.
// See `func (exec *txnExecutor) Exec(sql string)` for details.
txnOp := cwft.proc.TxnOperator
cwft.ses.SetTxnId(txnOp.Txn().ID)
if txnOp != nil && !cwft.ses.IsDerivedStmt() {
ok, _ := cwft.ses.GetTxnHandler().calledStartStmt()
if !ok {
Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ var RecordStatement = func(ctx context.Context, ses *Session, proc *process.Proc
stmID = uuid.New()
text = SubStringFromBegin(envStmt, int(ses.GetParameterUnit().SV.LengthOfQueryPrinted))
}
ses.sqlType.Store(sqlType)
ses.SetStmtId(stmID)
ses.SetStmtType(getStatementType(statement).GetStatementType())
ses.SetQueryType(getStatementType(statement).GetQueryType())
ses.SetSqlSourceType(sqlType)
ses.SetSqlOfStmt(text)

//note: txn id here may be empty
if sqlType != constant.InternalSql {
ses.pushQueryId(types.Uuid(stmID).ToString())
}
Expand Down Expand Up @@ -336,7 +342,7 @@ var RecordStatementTxnID = func(ctx context.Context, ses *Session) {
} else {
stm.SetTxnID(txn.Txn().ID)
}

ses.SetTxnId(txn.Txn().ID)
}
stm.Report(ctx)
}
Expand Down Expand Up @@ -2554,6 +2560,8 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
var loadLocalErrGroup *errgroup.Group
var loadLocalWriter *io.PipeWriter

ses.SetQueryStart(time.Now())

// per statement profiler
requestCtx, endStmtProfile := fileservice.NewStatementProfiler(requestCtx)
if endStmtProfile != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func Test_mce_selfhandle(t *testing.T) {
).AnyTimes()

txnOperator := mock_frontend.NewMockTxnOperator(ctrl)
txnOperator.EXPECT().Txn().Return(txn.TxnMeta{}).AnyTimes()
txnOperator.EXPECT().Commit(ctx).Return(nil).AnyTimes()
txnOperator.EXPECT().Rollback(ctx).Return(nil).AnyTimes()

Expand Down
164 changes: 132 additions & 32 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package frontend
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"runtime"
"strings"
Expand Down Expand Up @@ -217,7 +216,6 @@ type Session struct {
// requestLabel is the CN label info requested from client.
requestLabel map[string]string

sqlType atomic.Value
// startedAt is the session start time.
startedAt time.Time

Expand All @@ -230,6 +228,120 @@ type Session struct {
// nextval internally will derive two sql (a select and an update). the two sql are executed
// in the same transaction.
derivedStmt bool

//clear this part for every statement
stmtProfile struct {
// sqlSourceType denotes where the sql
sqlSourceType string
txnId uuid.UUID
stmtId uuid.UUID
// stmtType
stmtType string
// queryType
queryType string
// queryStart is the time when the query starts.
queryStart time.Time
//the sql from user may have multiple statements
//sqlOfStmt is the text part of one statement in the sql
sqlOfStmt string
}
}

func (ses *Session) ClearStmtProfile() {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.stmtProfile.sqlSourceType = ""
ses.stmtProfile.txnId = uuid.UUID{}
ses.stmtProfile.stmtId = uuid.UUID{}
ses.stmtProfile.stmtType = ""
ses.stmtProfile.queryType = ""
ses.stmtProfile.sqlOfStmt = ""
}

func (ses *Session) GetSessionStart() time.Time {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.startedAt
}

func (ses *Session) SetTxnId(id []byte) {
ses.mu.Lock()
defer ses.mu.Unlock()
copy(ses.stmtProfile.txnId[:], id)
}

func (ses *Session) GetTxnId() uuid.UUID {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.stmtProfile.txnId
}

func (ses *Session) SetStmtId(id uuid.UUID) {
ses.mu.Lock()
defer ses.mu.Unlock()
copy(ses.stmtProfile.stmtId[:], id[:])
}

func (ses *Session) GetStmtId() uuid.UUID {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.stmtProfile.stmtId
}

func (ses *Session) SetStmtType(st string) {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.stmtProfile.stmtType = st
}

func (ses *Session) GetStmtType() string {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.stmtProfile.stmtType
}

func (ses *Session) SetQueryType(qt string) {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.stmtProfile.queryType = qt
}

func (ses *Session) GetQueryType() string {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.stmtProfile.queryType
}

func (ses *Session) SetSqlSourceType(st string) {
ses.stmtProfile.sqlSourceType = st
}

func (ses *Session) GetSqlSourceType() string {
return ses.stmtProfile.sqlSourceType
}

func (ses *Session) SetQueryStart(t time.Time) {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.stmtProfile.queryStart = t
}

func (ses *Session) GetQueryStart() time.Time {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.stmtProfile.queryStart
}

func (ses *Session) SetSqlOfStmt(sot string) {
ses.mu.Lock()
defer ses.mu.Unlock()
ses.stmtProfile.sqlOfStmt = sot
}

func (ses *Session) GetSqlOfStmt() string {
ses.mu.Lock()
defer ses.mu.Unlock()
return ses.stmtProfile.sqlOfStmt
}

func (ses *Session) IsDerivedStmt() bool {
Expand Down Expand Up @@ -498,6 +610,7 @@ func (ses *Session) Close() {
ses.seqCurValues = nil
ses.seqLastValue = nil
ses.sqlHelper = nil
ses.ClearStmtProfile()
// The mpool cleanup must be placed at the end,
// and you must wait for all resources to be cleaned up before you can delete the mpool
if ses.proc != nil {
Expand Down Expand Up @@ -2037,44 +2150,31 @@ func (ses *Session) SetNewResponse(category int, affectedRows uint64, cmd int, d
// StatusSession implements the queryservice.Session interface.
func (ses *Session) StatusSession() *status.Session {
var (
txnID string
statementID string
statementType string
queryType string
sqlSourceType string
queryStart time.Time
accountName string
userName string
roleName string
)
if ses.txnHandler != nil && ses.txnHandler.txnOperator != nil {
txn := ses.txnHandler.txnOperator.Txn()
txnID = hex.EncodeToString(txn.GetID())
}
stmtInfo := ses.tStmt
if stmtInfo != nil {
statementID = uuid.UUID(stmtInfo.StatementID).String()
statementType = stmtInfo.StatementType
queryType = stmtInfo.QueryType
queryStart = stmtInfo.RequestAt
}
if v := ses.sqlType.Load(); v != nil {
sqlSourceType = v.(string)
}

accountName, userName, roleName = getUserProfile(ses.GetTenantInfo())
return &status.Session{
NodeID: ses.getRoutineManager().baseService.ID(),
ConnID: ses.GetConnectionID(),
SessionID: ses.GetUUIDString(),
Account: ses.GetTenantName(),
User: ses.GetUserName(),
Account: accountName,
User: userName,
Host: ses.getRoutineManager().baseService.SQLAddress(),
DB: ses.GetDatabaseName(),
SessionStart: ses.startedAt,
SessionStart: ses.GetSessionStart(),
Command: ses.GetCmd().String(),
Info: ses.GetSql(),
TxnID: txnID,
StatementID: statementID,
StatementType: statementType,
QueryType: queryType,
SQLSourceType: sqlSourceType,
QueryStart: queryStart,
Info: ses.GetSqlOfStmt(),
TxnID: ses.GetTxnId().String(),
StatementID: ses.GetStmtId().String(),
StatementType: ses.GetStmtType(),
QueryType: ses.GetQueryType(),
SQLSourceType: ses.GetSqlSourceType(),
QueryStart: ses.GetQueryStart(),
ClientHost: ses.GetMysqlProtocol().Peer(),
Role: roleName,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func TestSession_TxnCompilerContext(t *testing.T) {
testutil.SetupAutoIncrService()
ctx := context.TODO()
txnOperator := mock_frontend.NewMockTxnOperator(ctrl)
txnOperator.EXPECT().Txn().Return(txn.TxnMeta{}).AnyTimes()
txnOperator.EXPECT().Commit(ctx).Return(nil).AnyTimes()
txnOperator.EXPECT().Rollback(ctx).Return(nil).AnyTimes()
txnClient := mock_frontend.NewMockTxnClient(ctrl)
Expand Down
3 changes: 3 additions & 0 deletions pkg/frontend/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (th *TxnHandler) NewTxn() (context.Context, TxnOperator, error) {
// txnOp.GetWorkspace().StartStatement()
// th.enableStartStmt()
//}
th.ses.SetTxnId(txnOp.Txn().ID)
return txnCtx, txnOp, err
}

Expand Down Expand Up @@ -316,6 +317,7 @@ func (th *TxnHandler) CommitTxn() error {
}()
}
if txnOp != nil {
th.ses.SetTxnId(txnOp.Txn().ID)
err = txnOp.Commit(ctx2)
if err != nil {
txnId := txnOp.Txn().DebugString()
Expand Down Expand Up @@ -376,6 +378,7 @@ func (th *TxnHandler) RollbackTxn() error {
}()
}
if txnOp != nil {
th.ses.SetTxnId(txnOp.Txn().ID)
err = txnOp.Rollback(ctx2)
if err != nil {
txnId := txnOp.Txn().DebugString()
Expand Down
20 changes: 20 additions & 0 deletions pkg/frontend/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,3 +639,23 @@ func copyBytes(src []byte, needCopy bool) []byte {
}
return src
}

// getUserProfile returns the account, user, role of the account
func getUserProfile(account *TenantInfo) (string, string, string) {
var (
accountName string
userName string
roleName string
)

if account != nil {
accountName = account.GetTenant()
userName = account.GetUser()
roleName = account.GetDefaultRole()
} else {
accountName = sysAccountName
userName = rootName
roleName = moAdminRoleName
}
return accountName, userName, roleName
}
Loading