Skip to content

Commit

Permalink
add mo_sessions table function and view (#11748)
Browse files Browse the repository at this point in the history
讨论:https://github.com/matrixorigin/docs/blob/main/design/system_view.md#%E4%BC%9A%E8%AF%9D


1,增加table function `mo_sessions`.
2,增加系统视图`mo_catalog.mo_sessions`.
3,实现方案:
在table function `processlist`基础上,增加了字段`client_host`,`role`。并调整了部分字段的数据来源。
3,增加view的升级代码。

`mo_sessions`字段定义.

```

node_id: uuid。cn_id cn启动后,不会变
conn_id: client的tcp链接在mo中的编号。由hakeeper定义了编号的生成方式。
session_id: uuid。新session创建时产生
account: 租户名。
user: 用户名。
host: cn接受client请求的ip:port。
db: sql使用的database
session_start: 新session创建时间戳。
command: mysql命令类型。COM_QUERY,COM_STMT_PREPARE,COM_STMT_EXECUTE等
info: 执行的sql。一个sql里面可能有多个语句
txn_id: sql涉及的事务id
statement_id: sql中一个语句的uuid
statement_type: sql中一个语句的类型。select,insert,update等
query_type: sql中一个语句的种类。 DQL,TCL等
sql_source_type: sql中一个语句的来源。external,internal等
query_start: sql中一个语句的开始执行时间。
client_host: client的ip端口。
role: 角色
```

Approved by: @reusee, @m-schen, @nnsgmsone, @zhangxu19830126, @ouyuanning, @qingxinhome, @badboynt1, @aunjgr, @aressu1985
  • Loading branch information
daviszhen authored and sukki37 committed Sep 18, 2023
1 parent 86fba31 commit 0c749e4
Show file tree
Hide file tree
Showing 21 changed files with 472 additions and 85 deletions.
61 changes: 60 additions & 1 deletion pkg/cnservice/upgrader/newAddTable.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,63 @@ var STATISTICSView = &table.Table{
"join `mo_catalog`.`mo_columns` `tcl` on (`idx`.`table_id` = `tcl`.`att_relname_id` and `idx`.`column_name` = `tcl`.`attname`)",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView}
var processlistView = &table.Table{
Account: table.AccountAll,
Database: sysview.InformationDBConst,
Table: "processlist",
Columns: []table.Column{
table.StringColumn("account", "the account name"),
table.StringColumn("client_host", "the ip:port of the client"),
table.StringColumn("command", "the COMMAND send by client"),
table.UInt64Column("conn_id", "the connection id of the tcp between client"),
table.StringColumn("db", "the database be used"),
table.StringColumn("host", "the ip:port of the mo-server"),
table.StringColumn("info", "the sql"),
table.StringColumn("node_id", "the id of the cn"),
table.StringColumn("query_start", "the start time of the statement"),
table.StringColumn("query_type", "the kind of the statement. DQL,TCL,etc"),
table.StringColumn("role", "the role of the user"),
table.StringColumn("session_id", "the id of the session"),
table.StringColumn("session_start", "the start time of the session"),
table.StringColumn("sql_source_type", "where does the sql come from. internal,external, etc"),
table.StringColumn("statement_id", "the id of the statement"),
table.StringColumn("statement_type", "the type of the statement.Select,Delete,Insert,etc"),
table.StringColumn("txn_id", "the id of the transaction"),
table.StringColumn("user", "the user name"),
},
CreateViewSql: "CREATE VIEW IF NOT EXISTS `information_schema`.`PROCESSLIST` AS SELECT * FROM PROCESSLIST() A;",
//actually drop view here
CreateTableSql: "drop view if exists `information_schema`.`PROCESSLIST`;",
}

var MoSessionsView = &table.Table{
Account: table.AccountAll,
Database: catalog.MO_CATALOG,
Table: "mo_sessions",
Columns: []table.Column{
table.StringColumn("account", "the account name"),
table.StringColumn("client_host", "the ip:port of the client"),
table.StringColumn("command", "the COMMAND send by client"),
table.UInt64Column("conn_id", "the connection id of the tcp between client"),
table.StringColumn("db", "the database be used"),
table.StringColumn("host", "the ip:port of the mo-server"),
table.StringColumn("info", "the sql"),
table.StringColumn("node_id", "the id of the cn"),
table.StringColumn("query_start", "the start time of the statement"),
table.StringColumn("query_type", "the kind of the statement. DQL,TCL,etc"),
table.StringColumn("role", "the role of the user"),
table.StringColumn("session_id", "the id of the session"),
table.StringColumn("session_start", "the start time of the session"),
table.StringColumn("sql_source_type", "where does the sql come from. internal,external, etc"),
table.StringColumn("statement_id", "the id of the statement"),
table.StringColumn("statement_type", "the type of the statement.Select,Delete,Insert,etc"),
table.StringColumn("txn_id", "the id of the transaction"),
table.StringColumn("user", "the user name"),
},
CreateViewSql: "CREATE VIEW IF NOT EXISTS `mo_catalog`.`mo_sessions` AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;",
//actually drop view here
CreateTableSql: "drop view `mo_catalog`.`mo_sessions`;",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView, MoSessionsView}
var registeredViews = []*table.Table{processlistView}
59 changes: 56 additions & 3 deletions pkg/cnservice/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func ParseDataTypeToColType(dataType string) (table.ColType, error) {
return table.TBytes, nil
case strings.Contains(strings.ToLower(dataType), "uuid"):
return table.TUuid, nil
case strings.Contains(strings.ToLower(dataType), "int unsigned"):
return table.TUint64, nil
default:
return table.TSkip, moerr.NewInternalError(context.Background(), "unknown data type")
}
Expand Down Expand Up @@ -185,6 +187,11 @@ func (u *Upgrader) Upgrade(ctx context.Context) error {
return err
}

if err = u.UpgradeNewViewColumn(ctx); err != nil {
logutil.Errorf("upgrade new view column failed: %s", err.Error())
return err
}

if err = u.UpgradeNewTableColumn(ctx); err != nil {
logutil.Errorf("upgrade new table column failed: %s", err.Error())
return err
Expand All @@ -202,7 +209,46 @@ func (u *Upgrader) Upgrade(ctx context.Context) error {
return nil
}

// Upgrade the newly added columns in the system table
// UpgradeNewViewColumn the newly added columns in the system table
func (u *Upgrader) UpgradeNewViewColumn(ctx context.Context) error {
exec := u.IEFactory()
if exec == nil {
return nil
}

for _, tbl := range registeredViews {
currentSchema, err := u.GetCurrentSchema(ctx, exec, tbl.Database, tbl.Table)
if err != nil {
return err
}

diff, err := u.GenerateDiff(currentSchema, tbl)
if err != nil {
return err
} else if len(diff.AddedColumns) == 0 {
continue
}

//
stmt := []string{
"begin;",
appendSemicolon(tbl.CreateTableSql), //drop view
appendSemicolon(tbl.CreateViewSql), //create view
"commit;",
}

//alter view
upgradeSQL := strings.Join(stmt, "\n")

// Execute upgrade SQL
if err = exec.Exec(ctx, upgradeSQL, ie.NewOptsBuilder().Finish()); err != nil {
return err
}
}
return nil
}

// UpgradeNewTableColumn the newly added columns in the system table
func (u *Upgrader) UpgradeNewTableColumn(ctx context.Context) error {
exec := u.IEFactory()
if exec == nil {
Expand Down Expand Up @@ -235,7 +281,7 @@ func (u *Upgrader) UpgradeNewTableColumn(ctx context.Context) error {
return nil
}

// Upgrade system tables, add system tables
// UpgradeNewTable system tables, add system tables
func (u *Upgrader) UpgradeNewTable(ctx context.Context, tenants []*frontend.TenantInfo) error {
exec := u.IEFactory()
if exec == nil {
Expand Down Expand Up @@ -266,7 +312,7 @@ func (u *Upgrader) UpgradeNewTable(ctx context.Context, tenants []*frontend.Tena
return nil
}

// Upgrade system tables, add system views
// UpgradeNewView system tables, add system views
func (u *Upgrader) UpgradeNewView(ctx context.Context, tenants []*frontend.TenantInfo) error {
exec := u.IEFactory()
if exec == nil {
Expand Down Expand Up @@ -421,3 +467,10 @@ func attachAccount(ctx context.Context, tenant *frontend.TenantInfo) context.Con
func makeOptions(tenant *frontend.TenantInfo) *ie.OptsBuilder {
return ie.NewOptsBuilder().AccountId(tenant.GetTenantID()).UserId(tenant.GetUserID()).DefaultRoleId(tenant.GetDefaultRoleID())
}

func appendSemicolon(s string) string {
if !strings.HasSuffix(s, ";") {
return s + ";"
}
return s
}
4 changes: 4 additions & 0 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ var (
"mo_mysql_compatibility_mode": 0,
"mo_stages": 0,
catalog.MOAutoIncrTable: 0,
"mo_sessions": 0,
}
configInitVariables = map[string]int8{
"save_query_result": 0,
Expand All @@ -842,6 +843,7 @@ var (
"mo_table_partitions": 0,
"mo_pubs": 0,
"mo_stages": 0,
"mo_sessions": 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = fmt.Sprintf(`create table if not exists %s (
Expand Down Expand Up @@ -1016,6 +1018,7 @@ var (
comment text,
primary key(stage_id)
);`,
`CREATE VIEW IF NOT EXISTS mo_sessions AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;`,
}

//drop tables for the tenant
Expand All @@ -1029,6 +1032,7 @@ var (
`drop table if exists mo_catalog.mo_stored_procedure;`,
`drop table if exists mo_catalog.mo_mysql_compatibility_mode;`,
`drop table if exists mo_catalog.mo_stages;`,
`drop view if exists mo_catalog.mo_sessions;`,
}
dropMoPubsSql = `drop table if exists mo_catalog.mo_pubs;`
deleteMoPubsSql = `delete from mo_catalog.mo_pubs;`
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (cwft *TxnComputationWrapper) Compile(requestCtx context.Context, u interfa
// statement ID and updating snapshot TS.
// See `func (exec *txnExecutor) Exec(sql string)` for details.
txnOp := cwft.proc.TxnOperator
cwft.ses.SetTxnId(txnOp.Txn().ID)
if txnOp != nil && !cwft.ses.IsDerivedStmt() {
ok, _ := cwft.ses.GetTxnHandler().calledStartStmt()
if !ok {
Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ var RecordStatement = func(ctx context.Context, ses *Session, proc *process.Proc
stmID = uuid.New()
text = SubStringFromBegin(envStmt, int(ses.GetParameterUnit().SV.LengthOfQueryPrinted))
}
ses.sqlType.Store(sqlType)
ses.SetStmtId(stmID)
ses.SetStmtType(getStatementType(statement).GetStatementType())
ses.SetQueryType(getStatementType(statement).GetQueryType())
ses.SetSqlSourceType(sqlType)
ses.SetSqlOfStmt(text)

//note: txn id here may be empty
if sqlType != constant.InternalSql {
ses.pushQueryId(types.Uuid(stmID).ToString())
}
Expand Down Expand Up @@ -336,7 +342,7 @@ var RecordStatementTxnID = func(ctx context.Context, ses *Session) {
} else {
stm.SetTxnID(txn.Txn().ID)
}

ses.SetTxnId(txn.Txn().ID)
}
stm.Report(ctx)
}
Expand Down Expand Up @@ -2544,6 +2550,8 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
var loadLocalErrGroup *errgroup.Group
var loadLocalWriter *io.PipeWriter

ses.SetQueryStart(time.Now())

// per statement profiler
requestCtx, endStmtProfile := fileservice.NewStatementProfiler(requestCtx)
if endStmtProfile != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func Test_mce_selfhandle(t *testing.T) {
).AnyTimes()

txnOperator := mock_frontend.NewMockTxnOperator(ctrl)
txnOperator.EXPECT().Txn().Return(txn.TxnMeta{}).AnyTimes()
txnOperator.EXPECT().Commit(ctx).Return(nil).AnyTimes()
txnOperator.EXPECT().Rollback(ctx).Return(nil).AnyTimes()

Expand Down
Loading

0 comments on commit 0c749e4

Please sign in to comment.