diff --git a/pkg/frontend/show_account.go b/pkg/frontend/show_account.go index 9df31f189f2ec..c058308bb1164 100644 --- a/pkg/frontend/show_account.go +++ b/pkg/frontend/show_account.go @@ -17,6 +17,9 @@ package frontend import ( "context" "fmt" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae" + "go.uber.org/zap" "math" "strconv" "strings" @@ -321,16 +324,84 @@ func updateStorageUsageCache(usages *cmd_util.StorageUsageResp_V3) { } } +func tryGetSizeFromMTS( + ctx context.Context, + accIds [][]int64, +) (sizes map[int64]uint64, ok bool) { + + var ( + err error + accs []uint64 + vals [][]any + ) + for i := range accIds { + for j := range accIds[i] { + accs = append(accs, uint64(accIds[i][j])) + } + } + + vals, accs, err, ok = disttae.QueryTableStatsByAccounts( + ctx, + []int{disttae.TableStatsTableSize}, + accs, + false, + false, + ) + + if err != nil || !ok { + logutil.Info("show accounts", + zap.Bool("get size from mts failed", ok), + zap.Error(err)) + + return nil, false + } + + if len(vals) == 0 { + return nil, false + } + + sizes = make(map[int64]uint64) + for i := range accs { + sizes[int64(accs[i])] += uint64(vals[0][i].(float64)) + } + + return sizes, true +} + // getAccountStorageUsage calculates the storage usage of all accounts // by handling checkpoint -func getAccountsStorageUsage(ctx context.Context, ses *Session, accIds [][]int64) (map[int64][]uint64, error) { +func getAccountsStorageUsage( + ctx context.Context, + ses *Session, + accIds [][]int64, +) (ret map[int64][]uint64, err error) { + if len(accIds) == 0 { return nil, nil } + defer func() { + if err != nil || ret == nil { + return + } + + sizes, ok := tryGetSizeFromMTS(ctx, accIds) + if ok { + for k, v := range sizes { + if len(ret[k]) == 0 { + ret[k] = append(ret[k], 0, 0) + } + ret[k][0] = v + } + logutil.Info("show accounts", + zap.Int("get size from mts (acc cnt)", len(sizes))) + } + }() + // step 1: check cache if usage, succeed := checkStorageUsageCache(accIds); succeed { - return usage, nil + ret = usage + return } // step 2: query to tn @@ -346,7 +417,8 @@ func getAccountsStorageUsage(ctx context.Context, ses *Session, accIds [][]int64 } updateStorageUsageCache_V2(usage) // step 3: handling these pulled data - return handleStorageUsageResponse_V2(ctx, usage) + ret, err = handleStorageUsageResponse_V2(ctx, usage) + return } else { usage, ok := response.(*cmd_util.StorageUsageResp_V3) @@ -357,7 +429,8 @@ func getAccountsStorageUsage(ctx context.Context, ses *Session, accIds [][]int64 updateStorageUsageCache(usage) // step 3: handling these pulled data - return handleStorageUsageResponse(ctx, usage) + ret, err = handleStorageUsageResponse(ctx, usage) + return } } diff --git a/pkg/vm/engine/disttae/mo_table_stats.go b/pkg/vm/engine/disttae/mo_table_stats.go index c6e9eee7c870d..799d40f9bc01a 100644 --- a/pkg/vm/engine/disttae/mo_table_stats.go +++ b/pkg/vm/engine/disttae/mo_table_stats.go @@ -29,7 +29,6 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/catalog" - "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/container/bytejson" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" @@ -38,7 +37,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" - "github.com/matrixorigin/matrixone/pkg/pb/metadata" "github.com/matrixorigin/matrixone/pkg/pb/task" "github.com/matrixorigin/matrixone/pkg/pb/timestamp" "github.com/matrixorigin/matrixone/pkg/sql/plan/function" @@ -164,26 +162,22 @@ const ( database_id in (%v) and table_id in (%v);` - getNewTablesSQL = ` - select - account_id, reldatabase, reldatabase_id, relname, rel_id - from - %s.%s - where - created_time >= '%s' and relkind != "v" - group by - account_id, reldatabase, reldatabase_id, relname, rel_id;` - insertNewTablesSQL = ` insert ignore into %s.%s (account_id, database_id, table_id, database_name, table_name, table_stats, update_time, takes) values %s;` - getMinTSSQL = ` - select - min(update_time) - from - %s.%s;` + findNewTableSQL = ` + select + A.account_id, A.reldatabase, A.reldatabase_id, A.relname, A.rel_id, A.relkind + from + %s.%s as A + left join + %s.%s as B + on + A.account_id = B.account_id and A.reldatabase_id = B.database_id and A.rel_id = B.table_id + where + B.table_id is NULL;` getNullStatsSQL = ` select @@ -194,11 +188,19 @@ const ( table_stats = "{}" limit %d;` + + accumulateIdsByAccSQL = ` + select + account_id, reldatabase_id, rel_id + from + %s.%s + where + account_id in (%s);` ) const ( defaultAlphaCycleDur = time.Minute - defaultGamaCycleDur = time.Minute * 10 + defaultGamaCycleDur = time.Minute defaultGetTableListLimit = options.DefaultBlockMaxRows logHeader = "MO-TABLE-STATS-TASK" @@ -217,6 +219,11 @@ const ( TableStatsCnt ) +const ( + betaTaskName = "beta" + gamaTaskName = "gama" +) + var TableStatsName = [TableStatsCnt]string{ "table_size", "table_rows", @@ -374,15 +381,19 @@ func initMoTableStatsConfig( }() } - dynamicCtx.launchTask = func() { + // beta task expect to be running on every cn. + // gama task expect to be running only on one cn. + dynamicCtx.launchTask = func(name string) { dynamicCtx.Lock() defer dynamicCtx.Unlock() - launch("beta task", &dynamicCtx.beta) - launch("gama task", &dynamicCtx.gama) + switch name { + case gamaTaskName: + launch("gama task", &dynamicCtx.gama) + case betaTaskName: + launch("beta task", &dynamicCtx.beta) + } } - - dynamicCtx.launchTask() }) return err @@ -416,10 +427,8 @@ var dynamicCtx struct { newest types.TS } - lastCheckNewTables types.TS - beta, gama taskState - launchTask func() + launchTask func(name string) alphaTaskPool *ants.Pool @@ -649,10 +658,6 @@ func forceUpdateQuery( if !resetUpdateTime { - for i := range tbls { - oldTS[i] = ×tamp.Timestamp{} - } - sql := fmt.Sprintf(getUpdateTSSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, intsJoin(accs, ","), @@ -683,10 +688,29 @@ func forceUpdateQuery( oldTS[idx] = ×tamp.Timestamp{PhysicalTime: stdTime.UnixNano()} } + var notExist = make([]uint64, 0, 1) + for i := range oldTS { + if oldTS[i] == nil { + oldTS[i] = ×tamp.Timestamp{} + notExist = append(notExist, tbls[i]) + } + } + if err = getChangedTableList( ctx, eng.service, eng, accs, dbs, tbls, oldTS, &pairs, &to); err != nil { return } + + // if a table not exist in mo table stats table, need update stats. + if len(notExist) != 0 { + for i := range pairs { + idx := slices.Index(notExist, uint64(pairs[i].tbl)) + if idx != -1 { + pairs[i].onlyUpdateTS = false + } + } + } + } else { to = types.BuildTS(time.Now().UnixNano(), 0) @@ -700,7 +724,8 @@ func forceUpdateQuery( } } - if err = alphaTask(ctx, eng.service, eng, pairs, "forceUpdateQuery"); err != nil { + if err = alphaTask(ctx, eng.service, eng, pairs, + fmt.Sprintf("forceUpdateQuery(reset_update=%v)", resetUpdateTime)); err != nil { return nil, err } @@ -790,6 +815,59 @@ func normalQuery( return statsVals, nil } +func QueryTableStatsByAccounts( + ctx context.Context, + wantedStatsIdxes []int, + accs []uint64, + forceUpdate bool, + resetUpdateTime bool, +) (statsVals [][]any, retAcc []uint64, err error, ok bool) { + + if len(accs) == 0 { + return + } + + newCtx := turn2SysCtx(ctx) + + sql := fmt.Sprintf(accumulateIdsByAccSQL, + catalog.MO_CATALOG, catalog.MO_TABLES, intsJoin(accs, ",")) + + sqlRet := executeSQL(newCtx, sql, "query table stats by accounts") + if err = sqlRet.Error(); err != nil { + return + } + + var ( + val any + + accs2 = make([]uint64, 0, sqlRet.RowCount()) + dbs = make([]uint64, 0, sqlRet.RowCount()) + tbls = make([]uint64, 0, sqlRet.RowCount()) + ) + + for i := range sqlRet.RowCount() { + if val, err = sqlRet.Value(newCtx, i, 0); err != nil { + return + } + accs2 = append(accs2, uint64(val.(uint32))) + + if val, err = sqlRet.Value(newCtx, i, 1); err != nil { + return + } + dbs = append(dbs, val.(uint64)) + + if val, err = sqlRet.Value(newCtx, i, 2); err != nil { + return + } + tbls = append(tbls, val.(uint64)) + } + + statsVals, err, ok = QueryTableStats( + newCtx, wantedStatsIdxes, accs2, dbs, tbls, forceUpdate, resetUpdateTime, nil) + + return statsVals, accs2, err, ok +} + func QueryTableStats( ctx context.Context, wantedStatsIdxes []int, @@ -797,7 +875,7 @@ func QueryTableStats( forceUpdate bool, resetUpdateTime bool, eng engine.Engine, -) (statsVals [][]any, err error) { +) (statsVals [][]any, err error, ok bool) { dynamicCtx.Lock() useOld := dynamicCtx.conf.StatsUsingOldImpl @@ -842,14 +920,17 @@ func QueryTableStats( de = eng.(*Engine) } - return forceUpdateQuery( + statsVals, err = forceUpdateQuery( newCtx, wantedStatsIdxes, accs, dbs, tbls, resetUpdateTime, de) + return statsVals, err, true } - return normalQuery(newCtx, wantedStatsIdxes, accs, dbs, tbls) + statsVals, err = normalQuery(newCtx, wantedStatsIdxes, accs, dbs, tbls) + + return statsVals, err, true } func MTSTableSize( @@ -860,7 +941,7 @@ func MTSTableSize( resetUpdateTime bool, ) (sizes []uint64, err error) { - statsVals, err := QueryTableStats( + statsVals, err, _ := QueryTableStats( ctx, []int{TableStatsTableSize}, accs, dbs, tbls, forceUpdate, resetUpdateTime, eng) @@ -887,7 +968,7 @@ func MTSTableRows( resetUpdateTime bool, ) (sizes []uint64, err error) { - statsVals, err := QueryTableStats( + statsVals, err, _ := QueryTableStats( ctx, []int{TableStatsTableRows}, accs, dbs, tbls, forceUpdate, resetUpdateTime, eng) @@ -1046,6 +1127,11 @@ func turn2SysCtx(ctx context.Context) context.Context { return newCtx } +func LaunchMTSTasksForUT() { + dynamicCtx.launchTask(gamaTaskName) + dynamicCtx.launchTask(betaTaskName) +} + func tableStatsExecutor( ctx context.Context, service string, @@ -1097,116 +1183,18 @@ func tableStatsExecutor( } } -func insertNewTables( - ctx context.Context, - service string, - eng engine.Engine, -) (err error) { - - var ( - val any - tm time.Time - sql string - sqlRet ie.InternalExecResult - - dbName, tblName string - accId, dbId, tblId uint64 - ) - - //if dynamicCtx.lastCheckNewTables.IsEmpty() { - sql = fmt.Sprintf(getMinTSSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS) - sqlRet = executeSQL(ctx, sql, "insert new table-0: get min ts") - if err = sqlRet.Error(); err != nil { - return err - } - - if val, err = sqlRet.Value(ctx, 0, 0); err != nil { - return err - } - - if val != nil { - if tm, err = time.Parse("2006-01-02 15:04:05.000000", val.(string)); err != nil { - return - } - - dynamicCtx.lastCheckNewTables = types.BuildTS(tm.UnixNano(), 0) - } - - //} - - sql = fmt.Sprintf(getNewTablesSQL, - catalog.MO_CATALOG, catalog.MO_TABLES, - dynamicCtx.lastCheckNewTables. - ToTimestamp(). - ToStdTime(). - Format("2006-01-02 15:04:05"), - ) - - sqlRet = executeSQL(ctx, sql, "insert new table-1: get new tables") - if err = sqlRet.Error(); err != nil { - return err - } - - valFmt := "(%d,%d,%d,'%s','%s','{}','%s',0)" - - values := make([]string, 0, sqlRet.RowCount()) - for i := range sqlRet.RowCount() { - if val, err = sqlRet.Value(ctx, i, 0); err != nil { - return err - } - accId = uint64(val.(uint32)) - - if val, err = sqlRet.Value(ctx, i, 1); err != nil { - return err - } - dbName = string(val.([]uint8)) - - if val, err = sqlRet.Value(ctx, i, 2); err != nil { - return err - } - dbId = val.(uint64) - - if val, err = sqlRet.Value(ctx, i, 3); err != nil { - return err - } - tblName = string(val.([]uint8)) - - if val, err = sqlRet.Value(ctx, i, 4); err != nil { - return err - } - tblId = val.(uint64) - - values = append(values, fmt.Sprintf(valFmt, - accId, dbId, tblId, dbName, tblName, - timestamp.Timestamp{}.ToStdTime(). - Format("2006-01-02 15:04:05.000000"))) - } - - if len(values) == 0 { - return - } - - sql = fmt.Sprintf(insertNewTablesSQL, - catalog.MO_CATALOG, catalog.MO_TABLE_STATS, - strings.Join(values, ",")) - - sqlRet = executeSQL(ctx, sql, "insert new table-2: insert new tables") - return sqlRet.Error() -} - func prepare( ctx context.Context, service string, eng engine.Engine, ) (err error) { + // gama task running only on a specified cn + dynamicCtx.launchTask(gamaTaskName) + dynamicCtx.Lock() defer dynamicCtx.Unlock() - if err = insertNewTables(ctx, service, eng); err != nil { - return - } - offsetTS := types.TS{} for len(dynamicCtx.tableStock.tbls) == 0 { accs, dbs, tbls, ts, err := getCandidates(ctx, service, eng, dynamicCtx.conf.GetTableListLimit, offsetTS) @@ -1257,7 +1245,7 @@ func alphaTask( } // maybe the task exited, need to launch a new one - dynamicCtx.launchTask() + dynamicCtx.launchTask(betaTaskName) var ( errWaitToReceive = len(tbls) @@ -1470,105 +1458,197 @@ func NotifyUpdateForgotten() { dynamicCtx.updateForgottenQueue <- struct{}{} } -func gamaTask( +func gamaInsertNewTables( ctx context.Context, service string, eng engine.Engine, ) { var ( - cnCnt int - de = eng.(*Engine) + err error + sql string + sqlRet ie.InternalExecResult ) - clusterservice.GetMOCluster(de.service).GetCNService(clusterservice.Selector{}, func(service metadata.CNService) bool { - cnCnt++ - return true - }) - cnCnt = max(cnCnt, 1) + sql = fmt.Sprintf(findNewTableSQL, + catalog.MO_CATALOG, catalog.MO_TABLES, + catalog.MO_CATALOG, catalog.MO_TABLE_STATS, + ) - dynamicCtx.Lock() - gamaDur := dynamicCtx.conf.CorrectionDuration - gamaLimit := max(dynamicCtx.conf.GetTableListLimit/100, 100) - dynamicCtx.Unlock() + sqlRet = executeSQL(ctx, sql, "insert new table-0: get new tables") + if err = sqlRet.Error(); err != nil { + return + } - decodeIdsFromSqlRet := func( - sqlRet ie.InternalExecResult, - ) (accIds, dbIds, tblIds []uint64, err error) { - var val any - for i := range sqlRet.RowCount() { - if val, err = sqlRet.Value(ctx, i, 0); err != nil { - continue - } - accIds = append(accIds, uint64(val.(int64))) + var ( + val any - if sqlRet.ColumnCount() == 1 { - continue - } + values = make([]string, 0, sqlRet.RowCount()) - if val, err = sqlRet.Value(ctx, i, 1); err != nil { - continue - } - dbIds = append(dbIds, uint64(val.(int64))) + dbName, tblName string + accId, dbId, tblId uint64 + ) - if sqlRet.ColumnCount() == 2 { - continue - } + defer func() { + logutil.Error(logHeader, + zap.String("source", "gama insert new table"), + zap.Int("cnt", len(values)), + zap.Error(err)) + }() - if val, err = sqlRet.Value(ctx, i, 2); err != nil { - continue - } + valFmt := "(%d,%d,%d,'%s','%s','{}','%s',0)" - tblIds = append(tblIds, uint64(val.(int64))) + for i := range sqlRet.RowCount() { + if val, err = sqlRet.Value(ctx, i, 5); err != nil { + return } - return - } + if strings.ToLower(string(val.([]byte))) == "v" { + continue + } - // incremental update tables with heartbeat update_time - // may leave some tables never been updated. - // this opA does such correction. - opA := func() { - now := time.Now() + if val, err = sqlRet.Value(ctx, i, 0); err != nil { + return + } + accId = uint64(val.(uint32)) - sql := fmt.Sprintf(getNullStatsSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, gamaLimit) - sqlRet := executeSQL(ctx, sql, "gama task: get null stats list") - if sqlRet.Error() != nil { + if val, err = sqlRet.Value(ctx, i, 1); err != nil { return } + dbName = string(val.([]uint8)) - to := types.BuildTS(time.Now().UnixNano(), 0) + if val, err = sqlRet.Value(ctx, i, 2); err != nil { + return + } + dbId = val.(uint64) - accIds, dbIds, tblIds, err := decodeIdsFromSqlRet(sqlRet) - if err != nil { + if val, err = sqlRet.Value(ctx, i, 3); err != nil { return } + tblName = string(val.([]uint8)) - var tbls []tablePair + if val, err = sqlRet.Value(ctx, i, 4); err != nil { + return + } + tblId = val.(uint64) - for i := range tblIds { - tbl, ok := buildTablePairFromCache(de, accIds[i], dbIds[i], tblIds[i], to, false) - if !ok { - continue - } + values = append(values, fmt.Sprintf(valFmt, + accId, dbId, tblId, dbName, tblName, + timestamp.Timestamp{}.ToStdTime(). + Format("2006-01-02 15:04:05.000000"))) + } + + if len(values) == 0 { + return + } + + sql = fmt.Sprintf(insertNewTablesSQL, + catalog.MO_CATALOG, catalog.MO_TABLE_STATS, + strings.Join(values, ",")) + + sqlRet = executeSQL(ctx, sql, "insert new table-1: insert new tables") + err = sqlRet.Error() +} + +func decodeIdsFromMoTableStatsSqlRet( + ctx context.Context, + sqlRet ie.InternalExecResult, +) (ids1, ids2, ids3 []uint64, err error) { - tbls = append(tbls, tbl) + var val any + + for i := range sqlRet.RowCount() { + if val, err = sqlRet.Value(ctx, i, 0); err != nil { + continue } + ids1 = append(ids1, uint64(val.(int64))) - if err = alphaTask( - ctx, service, eng, tbls, "gama opA"); err != nil { - return + if sqlRet.ColumnCount() == 1 { + continue } + if val, err = sqlRet.Value(ctx, i, 1); err != nil { + continue + } + ids2 = append(ids2, uint64(val.(int64))) + + if sqlRet.ColumnCount() == 2 { + continue + } + + if val, err = sqlRet.Value(ctx, i, 2); err != nil { + continue + } + + ids3 = append(ids3, uint64(val.(int64))) + } + + return +} + +func gamaUpdateForgotten( + ctx context.Context, + service string, + de *Engine, + limit int, +) { + // incremental update tables with heartbeat update_time + // may leave some tables never been updated. + // this opA does such correction. + + var ( + err error + now = time.Now() + + tbls = make([]tablePair, 0, 1) + accIds []uint64 + dbIds []uint64 + tblIds []uint64 + ) + + defer func() { logutil.Info(logHeader, zap.String("source", "gama task"), - zap.Int("force update table", len(tbls)), - zap.Duration("takes", time.Since(now))) + zap.Int("update forgotten", len(tbls)), + zap.Duration("takes", time.Since(now)), + zap.Error(err)) + }() + + sql := fmt.Sprintf(getNullStatsSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, limit) + sqlRet := executeSQL(ctx, sql, "gama task: get null stats list") + if err = sqlRet.Error(); err != nil { + return + } + + to := types.BuildTS(time.Now().UnixNano(), 0) + + accIds, dbIds, tblIds, err = decodeIdsFromMoTableStatsSqlRet(ctx, sqlRet) + if err != nil { + return + } + + for i := range tblIds { + tbl, ok := buildTablePairFromCache(de, accIds[i], dbIds[i], tblIds[i], to, false) + if !ok { + continue + } - v2.GamaTaskCountingHistogram.Observe(float64(len(tbls))) - v2.GamaTaskDurationHistogram.Observe(time.Since(now).Seconds()) + tbls = append(tbls, tbl) } + if err = alphaTask( + ctx, service, de, tbls, "gama opA"); err != nil { + return + } + + v2.GamaTaskCountingHistogram.Observe(float64(len(tbls))) + v2.GamaTaskDurationHistogram.Observe(time.Since(now).Seconds()) +} + +func gamaCleanDeletes( + ctx context.Context, + de *Engine, +) { //mo_account, mo_database, mo_tables col name colName1 := []string{ "account_id", "dat_id", "rel_id", @@ -1583,7 +1663,20 @@ func gamaTask( } deleteByStep := func(step int) { - now := time.Now() + var ( + err error + now = time.Now() + + ids []uint64 + ) + + defer func() { + logutil.Info(logHeader, + zap.String("source", "gama task"), + zap.Int(fmt.Sprintf("deleted %s", colName2[step]), len(ids)), + zap.Duration("takes", time.Since(now)), + zap.String("detail", intsJoin(ids, ","))) + }() sql := fmt.Sprintf(getNextCheckAliveListSQL, colName2[step], colName2[step], catalog.MO_CATALOG, catalog.MO_TABLE_STATS, @@ -1594,7 +1687,7 @@ func gamaTask( return } - ids, _, _, err := decodeIdsFromSqlRet(sqlRet) + ids, _, _, err = decodeIdsFromMoTableStatsSqlRet(ctx, sqlRet) if len(ids) == 0 || err != nil { return } @@ -1603,7 +1696,7 @@ func gamaTask( colName1[step], catalog.MO_CATALOG, tblName[step], colName1[step], intsJoin(ids, ",")) sqlRet = executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-1", step)) - if sqlRet.Error() != nil { + if err = sqlRet.Error(); err != nil { return } @@ -1629,36 +1722,47 @@ func gamaTask( sql = fmt.Sprintf(getDeleteFromStatsSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, colName2[step], intsJoin(ids, ",")) sqlRet = executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-2", step)) - if sqlRet.Error() != nil { + if err = sqlRet.Error(); err != nil { return } } - - logutil.Info(logHeader, - zap.String("source", "gama task"), - zap.Int(fmt.Sprintf("deleted %s", colName2[step]), len(ids)), - zap.Duration("takes", time.Since(now)), - zap.String("detail", intsJoin(ids, ","))) } // clear deleted tbl, db, account - opB := func() { - // the stats belong to any deleted accounts/databases/tables have unchanged update time - // since they have been deleted. - // so opB collects tables ascending their update time and then check if they deleted. - deleteByStep(0) // clean account - deleteByStep(1) // clean database - deleteByStep(2) // clean tables - } + // the stats belong to any deleted accounts/databases/tables have unchanged update time + // since they have been deleted. + // so opB collects tables ascending their update time and then check if they deleted. + + deleteByStep(0) // clean account + deleteByStep(1) // clean database + deleteByStep(2) // clean tables +} + +func gamaTask( + ctx context.Context, + service string, + eng engine.Engine, +) { + + var ( + de = eng.(*Engine) + ) + + dynamicCtx.Lock() + gamaDur := dynamicCtx.conf.CorrectionDuration + gamaLimit := max(dynamicCtx.conf.GetTableListLimit/100, 100) + dynamicCtx.Unlock() - randDuration := func() time.Duration { + randDuration := func(n int) time.Duration { rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - return gamaDur + time.Duration(rnd.Intn(60*cnCnt))*time.Minute + return gamaDur + time.Duration(rnd.Intn(1*n))*time.Minute } - tickerA := time.NewTicker(randDuration()) - tickerB := time.NewTicker(randDuration()) + const baseFactory = 30 + tickerA := time.NewTicker(randDuration(baseFactory)) + tickerB := time.NewTicker(randDuration(baseFactory)) + tickerC := time.NewTicker(time.Millisecond) for { select { @@ -1669,21 +1773,36 @@ func gamaTask( return case <-tickerA.C: - dynamicCtx.gama.taskPool.Submit(opA) - tickerA.Reset(randDuration()) + dynamicCtx.gama.taskPool.Submit(func() { + gamaUpdateForgotten(ctx, service, de, gamaLimit) + }) + tickerA.Reset(randDuration(baseFactory)) case <-dynamicCtx.updateForgottenQueue: - dynamicCtx.gama.taskPool.Submit(opA) - tickerA.Reset(randDuration()) + dynamicCtx.gama.taskPool.Submit(func() { + gamaUpdateForgotten(ctx, service, de, gamaLimit) + }) + tickerA.Reset(randDuration(baseFactory)) case <-tickerB.C: - dynamicCtx.gama.taskPool.Submit(opB) - tickerB.Reset(randDuration()) + dynamicCtx.gama.taskPool.Submit(func() { + gamaCleanDeletes(ctx, de) + }) + tickerB.Reset(randDuration(baseFactory)) case <-dynamicCtx.cleanDeletesQueue: // emergence, do clean now - dynamicCtx.gama.taskPool.Submit(opB) - tickerB.Reset(randDuration()) + dynamicCtx.gama.taskPool.Submit(func() { + gamaCleanDeletes(ctx, de) + }) + tickerB.Reset(randDuration(baseFactory)) + + case <-tickerC.C: + dynamicCtx.gama.taskPool.Submit(func() { + gamaInsertNewTables(ctx, service, de) + }) + // try insert table at [1, 5] min + tickerC.Reset(randDuration(5)) } } } diff --git a/pkg/vm/engine/test/mo_table_stats_test.go b/pkg/vm/engine/test/mo_table_stats_test.go index 6bd070b389334..af10f1c311dea 100644 --- a/pkg/vm/engine/test/mo_table_stats_test.go +++ b/pkg/vm/engine/test/mo_table_stats_test.go @@ -130,6 +130,8 @@ func TestMoTableStatsMoCtl2(t *testing.T) { p := testutil.InitEnginePack(opts, t) defer p.Close() + disttae.LaunchMTSTasksForUT() + disttae.NotifyCleanDeletes() disttae.NotifyUpdateForgotten() @@ -146,7 +148,7 @@ func TestMoTableStatsMoCtl2(t *testing.T) { dbId := rel.GetDBID(p.Ctx) tblId := rel.GetTableID(p.Ctx) - _, err := disttae.QueryTableStats(context.Background(), + _, err, _ := disttae.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, false, false, nil) @@ -155,7 +157,7 @@ func TestMoTableStatsMoCtl2(t *testing.T) { ret = disttae.HandleMoTableStatsCtl("use_old_impl:true") require.Equal(t, "use old impl: false to true", ret) - _, err = disttae.QueryTableStats(context.Background(), + _, err, _ = disttae.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, false, false, nil) @@ -167,7 +169,7 @@ func TestMoTableStatsMoCtl2(t *testing.T) { ret = disttae.HandleMoTableStatsCtl("force_update:true") require.Equal(t, "force update: false to true", ret) - _, err = disttae.QueryTableStats(context.Background(), + _, err, _ = disttae.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, true, false, nil) @@ -176,7 +178,7 @@ func TestMoTableStatsMoCtl2(t *testing.T) { ret = disttae.HandleMoTableStatsCtl("move_on: false") require.Equal(t, "move on: true to false", ret) - _, err = disttae.QueryTableStats(context.Background(), + _, err, _ = disttae.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, false, true, nil)