Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mo_sessions table function and view #11748

Merged
merged 16 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion pkg/cnservice/upgrader/newAddTable.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,63 @@ var STATISTICSView = &table.Table{
"join `mo_catalog`.`mo_columns` `tcl` on (`idx`.`table_id` = `tcl`.`att_relname_id` and `idx`.`column_name` = `tcl`.`attname`)",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView}
var processlistView = &table.Table{
Account: table.AccountAll,
Database: sysview.InformationDBConst,
Table: "processlist",
Columns: []table.Column{
table.StringColumn("account", "the account name"),
table.StringColumn("client_host", "the ip:port of the client"),
table.StringColumn("command", "the COMMAND send by client"),
table.UInt64Column("conn_id", "the connection id of the tcp between client"),
table.StringColumn("db", "the database be used"),
table.StringColumn("host", "the ip:port of the mo-server"),
table.StringColumn("info", "the sql"),
table.StringColumn("node_id", "the id of the cn"),
table.StringColumn("query_start", "the start time of the statement"),
table.StringColumn("query_type", "the kind of the statement. DQL,TCL,etc"),
table.StringColumn("role", "the role of the user"),
table.StringColumn("session_id", "the id of the session"),
table.StringColumn("session_start", "the start time of the session"),
table.StringColumn("sql_source_type", "where does the sql come from. internal,external, etc"),
table.StringColumn("statement_id", "the id of the statement"),
table.StringColumn("statement_type", "the type of the statement.Select,Delete,Insert,etc"),
table.StringColumn("txn_id", "the id of the transaction"),
table.StringColumn("user", "the user name"),
},
CreateViewSql: "CREATE VIEW IF NOT EXISTS `information_schema`.`PROCESSLIST` AS SELECT * FROM PROCESSLIST() A;",
//actually drop view here
CreateTableSql: "drop view if exists `information_schema`.`PROCESSLIST`;",
}

var MoSessionsView = &table.Table{
Account: table.AccountAll,
Database: catalog.MO_CATALOG,
Table: "mo_sessions",
Columns: []table.Column{
table.StringColumn("account", "the account name"),
table.StringColumn("client_host", "the ip:port of the client"),
table.StringColumn("command", "the COMMAND send by client"),
table.UInt64Column("conn_id", "the connection id of the tcp between client"),
table.StringColumn("db", "the database be used"),
table.StringColumn("host", "the ip:port of the mo-server"),
table.StringColumn("info", "the sql"),
table.StringColumn("node_id", "the id of the cn"),
table.StringColumn("query_start", "the start time of the statement"),
table.StringColumn("query_type", "the kind of the statement. DQL,TCL,etc"),
table.StringColumn("role", "the role of the user"),
table.StringColumn("session_id", "the id of the session"),
table.StringColumn("session_start", "the start time of the session"),
table.StringColumn("sql_source_type", "where does the sql come from. internal,external, etc"),
table.StringColumn("statement_id", "the id of the statement"),
table.StringColumn("statement_type", "the type of the statement.Select,Delete,Insert,etc"),
table.StringColumn("txn_id", "the id of the transaction"),
table.StringColumn("user", "the user name"),
},
CreateViewSql: "CREATE VIEW IF NOT EXISTS `mo_catalog`.`mo_sessions` AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;",
//actually drop view here
CreateTableSql: "drop view `mo_catalog`.`mo_sessions`;",
}

var needUpgradNewView = []*table.Table{PARTITIONSView, STATISTICSView, MoSessionsView}
var registeredViews = []*table.Table{processlistView}
59 changes: 56 additions & 3 deletions pkg/cnservice/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func ParseDataTypeToColType(dataType string) (table.ColType, error) {
return table.TBytes, nil
case strings.Contains(strings.ToLower(dataType), "uuid"):
return table.TUuid, nil
case strings.Contains(strings.ToLower(dataType), "int unsigned"):
return table.TUint64, nil
default:
return table.TSkip, moerr.NewInternalError(context.Background(), "unknown data type")
}
Expand Down Expand Up @@ -185,6 +187,11 @@ func (u *Upgrader) Upgrade(ctx context.Context) error {
return err
}

if err = u.UpgradeNewViewColumn(ctx); err != nil {
logutil.Errorf("upgrade new view column failed: %s", err.Error())
return err
}

if err = u.UpgradeNewTableColumn(ctx); err != nil {
logutil.Errorf("upgrade new table column failed: %s", err.Error())
return err
Expand All @@ -202,7 +209,46 @@ func (u *Upgrader) Upgrade(ctx context.Context) error {
return nil
}

// Upgrade the newly added columns in the system table
// UpgradeNewViewColumn the newly added columns in the system table
func (u *Upgrader) UpgradeNewViewColumn(ctx context.Context) error {
exec := u.IEFactory()
if exec == nil {
return nil
}

for _, tbl := range registeredViews {
currentSchema, err := u.GetCurrentSchema(ctx, exec, tbl.Database, tbl.Table)
if err != nil {
return err
}

diff, err := u.GenerateDiff(currentSchema, tbl)
if err != nil {
return err
} else if len(diff.AddedColumns) == 0 {
continue
}

//
stmt := []string{
"begin;",
appendSemicolon(tbl.CreateTableSql), //drop view
appendSemicolon(tbl.CreateViewSql), //create view
"commit;",
}

//alter view
upgradeSQL := strings.Join(stmt, "\n")

// Execute upgrade SQL
if err = exec.Exec(ctx, upgradeSQL, ie.NewOptsBuilder().Finish()); err != nil {
return err
}
}
return nil
}

// UpgradeNewTableColumn the newly added columns in the system table
func (u *Upgrader) UpgradeNewTableColumn(ctx context.Context) error {
exec := u.IEFactory()
if exec == nil {
Expand Down Expand Up @@ -235,7 +281,7 @@ func (u *Upgrader) UpgradeNewTableColumn(ctx context.Context) error {
return nil
}

// Upgrade system tables, add system tables
// UpgradeNewTable system tables, add system tables
func (u *Upgrader) UpgradeNewTable(ctx context.Context, tenants []*frontend.TenantInfo) error {
exec := u.IEFactory()
if exec == nil {
Expand Down Expand Up @@ -266,7 +312,7 @@ func (u *Upgrader) UpgradeNewTable(ctx context.Context, tenants []*frontend.Tena
return nil
}

// Upgrade system tables, add system views
// UpgradeNewView system tables, add system views
func (u *Upgrader) UpgradeNewView(ctx context.Context, tenants []*frontend.TenantInfo) error {
exec := u.IEFactory()
if exec == nil {
Expand Down Expand Up @@ -421,3 +467,10 @@ func attachAccount(ctx context.Context, tenant *frontend.TenantInfo) context.Con
func makeOptions(tenant *frontend.TenantInfo) *ie.OptsBuilder {
return ie.NewOptsBuilder().AccountId(tenant.GetTenantID()).UserId(tenant.GetUserID()).DefaultRoleId(tenant.GetDefaultRoleID())
}

func appendSemicolon(s string) string {
if !strings.HasSuffix(s, ";") {
return s + ";"
}
return s
}
4 changes: 4 additions & 0 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ var (
"mo_mysql_compatibility_mode": 0,
"mo_stages": 0,
catalog.MOAutoIncrTable: 0,
"mo_sessions": 0,
}
configInitVariables = map[string]int8{
"save_query_result": 0,
Expand All @@ -842,6 +843,7 @@ var (
"mo_table_partitions": 0,
"mo_pubs": 0,
"mo_stages": 0,
"mo_sessions": 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = fmt.Sprintf(`create table if not exists %s (
Expand Down Expand Up @@ -1016,6 +1018,7 @@ var (
comment text,
primary key(stage_id)
);`,
`CREATE VIEW IF NOT EXISTS mo_sessions AS SELECT * FROM mo_sessions() AS mo_sessions_tmp;`,
}

//drop tables for the tenant
Expand All @@ -1029,6 +1032,7 @@ var (
`drop table if exists mo_catalog.mo_stored_procedure;`,
`drop table if exists mo_catalog.mo_mysql_compatibility_mode;`,
`drop table if exists mo_catalog.mo_stages;`,
`drop view if exists mo_catalog.mo_sessions;`,
}
dropMoPubsSql = `drop table if exists mo_catalog.mo_pubs;`
deleteMoPubsSql = `delete from mo_catalog.mo_pubs;`
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (cwft *TxnComputationWrapper) Compile(requestCtx context.Context, u interfa
// statement ID and updating snapshot TS.
// See `func (exec *txnExecutor) Exec(sql string)` for details.
txnOp := cwft.proc.TxnOperator
cwft.ses.SetTxnId(txnOp.Txn().ID)
if txnOp != nil && !cwft.ses.IsDerivedStmt() {
ok, _ := cwft.ses.GetTxnHandler().calledStartStmt()
if !ok {
Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ var RecordStatement = func(ctx context.Context, ses *Session, proc *process.Proc
stmID = uuid.New()
text = SubStringFromBegin(envStmt, int(ses.GetParameterUnit().SV.LengthOfQueryPrinted))
}
ses.sqlType.Store(sqlType)
ses.SetStmtId(stmID)
ses.SetStmtType(getStatementType(statement).GetStatementType())
ses.SetQueryType(getStatementType(statement).GetQueryType())
ses.SetSqlSourceType(sqlType)
ses.SetSqlOfStmt(text)

//note: txn id here may be empty
if sqlType != constant.InternalSql {
ses.pushQueryId(types.Uuid(stmID).ToString())
}
Expand Down Expand Up @@ -336,7 +342,7 @@ var RecordStatementTxnID = func(ctx context.Context, ses *Session) {
} else {
stm.SetTxnID(txn.Txn().ID)
}

ses.SetTxnId(txn.Txn().ID)
}
stm.Report(ctx)
}
Expand Down Expand Up @@ -2554,6 +2560,8 @@ func (mce *MysqlCmdExecutor) executeStmt(requestCtx context.Context,
var loadLocalErrGroup *errgroup.Group
var loadLocalWriter *io.PipeWriter

ses.SetQueryStart(time.Now())

// per statement profiler
requestCtx, endStmtProfile := fileservice.NewStatementProfiler(requestCtx)
if endStmtProfile != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func Test_mce_selfhandle(t *testing.T) {
).AnyTimes()

txnOperator := mock_frontend.NewMockTxnOperator(ctrl)
txnOperator.EXPECT().Txn().Return(txn.TxnMeta{}).AnyTimes()
txnOperator.EXPECT().Commit(ctx).Return(nil).AnyTimes()
txnOperator.EXPECT().Rollback(ctx).Return(nil).AnyTimes()

Expand Down
Loading