diff --git a/executor/prepared.go b/executor/prepared.go index 66dd5c38ee073..4bebabb6aff52 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -165,7 +165,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { } switch stmt.(type) { - case *ast.LoadDataStmt, *ast.PrepareStmt, *ast.ExecuteStmt, *ast.DeallocateStmt: + case *ast.LoadDataStmt, *ast.PrepareStmt, *ast.ExecuteStmt, *ast.DeallocateStmt, *ast.NonTransactionalDeleteStmt: return ErrUnsupportedPs } diff --git a/metrics/metrics.go b/metrics/metrics.go index 1803ebe127cdb..0f653e2801db8 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -165,6 +165,7 @@ func RegisterMetrics() { prometheus.MustRegister(CPUProfileCounter) prometheus.MustRegister(ReadFromTableCacheCounter) prometheus.MustRegister(LoadTableCacheDurationHistogram) + prometheus.MustRegister(NonTransactionalDeleteCount) tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() diff --git a/metrics/session.go b/metrics/session.go index 83df91439d311..4073644342e22 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -127,6 +127,14 @@ var ( Name: "validate_read_ts_from_pd_count", Help: "Counter of validating read ts by getting a timestamp from PD", }) + + NonTransactionalDeleteCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "session", + Name: "non_transactional_delete_count", + Help: "Counter of non-transactional delete", + }) ) // Label constants. diff --git a/metrics/telemetry.go b/metrics/telemetry.go index e98f5a9f5d6d0..10bf1ac8b624e 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -67,3 +67,22 @@ func GetCTECounter() CTEUsageCounter { NonCTEUsed: readCounter(TelemetrySQLCTECnt.With(prometheus.Labels{LblCTEType: "notCTE"})), } } + +// NonTransactionalStmtCounter records the usages of non-transactional statements. +type NonTransactionalStmtCounter struct { + DeleteCount int64 `json:"delete"` +} + +// Sub returns the difference of two counters. +func (n NonTransactionalStmtCounter) Sub(rhs NonTransactionalStmtCounter) NonTransactionalStmtCounter { + return NonTransactionalStmtCounter{ + DeleteCount: n.DeleteCount - rhs.DeleteCount, + } +} + +// GetNonTransactionalStmtCounter gets the NonTransactionalStmtCounter. +func GetNonTransactionalStmtCounter() NonTransactionalStmtCounter { + return NonTransactionalStmtCounter{ + DeleteCount: readCounter(NonTransactionalDeleteCount), + } +} diff --git a/session/nontransactional.go b/session/nontransactional.go index f68ad34b76efd..3c0f44f7a2d2e 100644 --- a/session/nontransactional.go +++ b/session/nontransactional.go @@ -24,6 +24,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" @@ -35,6 +38,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -45,7 +49,7 @@ type job struct { end types.Datum err error jobID int - jobSize int + jobSize int // it can be inaccurate if there are concurrent writes sql string } @@ -57,8 +61,11 @@ type statementBuildInfo struct { originalCondition ast.ExprNode } -func (j job) String() string { - return fmt.Sprintf("job id: %d, job size: %d, range: [%s, %s]", j.jobID, j.jobSize, j.start.String(), j.end.String()) +func (j job) String(redacted bool) string { + if redacted { + return fmt.Sprintf("job id: %d, estimated size: %d", j.jobID, j.jobSize) + } + return fmt.Sprintf("job id: %d, estimated size: %d, sql: %s", j.jobID, j.jobSize, j.sql) } // HandleNonTransactionalDelete is the entry point for a non-transactional delete @@ -70,6 +77,7 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona if err := checkConstraint(ctx, stmt, se); err != nil { return nil, err } + metrics.NonTransactionalDeleteCount.Inc() tableName, selectSQL, shardColumnInfo, err := buildSelectSQL(stmt, se) if err != nil { return nil, err @@ -77,7 +85,13 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona if stmt.DryRun == ast.DryRunQuery { return buildDryRunResults(stmt.DryRun, []string{selectSQL}, se.GetSessionVars().BatchSize.MaxChunkSize) } - jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo) + + // TODO: choose an appropriate quota. + // Use the mem-quota-query as a workaround. As a result, a NT-DML may consume 2x of the memory quota. + memTracker := memory.NewTracker(memory.LabelForNonTransactionalDML, se.GetSessionVars().MemQuotaQuery) + memTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker) + defer memTracker.DetachFromGlobalTracker() + jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo, memTracker) if err != nil { return nil, err } @@ -89,7 +103,7 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona if stmt.DryRun == ast.DryRunSplitDml { return buildDryRunResults(stmt.DryRun, splitStmts, se.GetSessionVars().BatchSize.MaxChunkSize) } - return buildExecuteResults(jobs, se.GetSessionVars().BatchSize.MaxChunkSize) + return buildExecuteResults(ctx, jobs, se.GetSessionVars().BatchSize.MaxChunkSize, se.GetSessionVars().EnableRedactLog) } func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session) error { @@ -108,13 +122,19 @@ func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, if sessVars.SnapshotTS != 0 { return errors.New("can't do non-transactional DML when tidb_snapshot is set") } - // TODO: return error if there are multiple tables - if stmt.DeleteStmt.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs == nil { + + if stmt.DeleteStmt.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs.Left == nil { return errors.New("table reference is nil") } if stmt.DeleteStmt.TableRefs.TableRefs.Right != nil { return errors.New("Non-transactional delete doesn't support multiple tables") } + if stmt.DeleteStmt.Limit != nil { + return errors.New("Non-transactional delete doesn't support limit") + } + if stmt.DeleteStmt.Order != nil { + return errors.New("Non-transactional delete doesn't support order by") + } return nil } @@ -145,7 +165,7 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction failedJobs := make([]string, 0) for _, job := range jobs { if job.err != nil { - failedJobs = append(failedJobs, fmt.Sprintf("job:%s, error: %s", job.String(), job.err.Error())) + failedJobs = append(failedJobs, fmt.Sprintf("job:%s, error: %s", job.String(se.GetSessionVars().EnableRedactLog), job.err.Error())) } } if len(failedJobs) == 0 { @@ -177,17 +197,13 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction // if the first job failed, there is a large chance that all jobs will fail. So return early. if i == 0 && jobs[i].err != nil { - jobs[i].err = errors.Wrap(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled") - logutil.Logger(ctx).Error("Non-transactional delete, early return", zap.Error(jobs[i].err)) - break + return nil, errors.Annotate(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled") } } return splitStmts, nil } func doOneJob(ctx context.Context, job *job, totalJobCount int, options statementBuildInfo, se Session, dryRun bool) string { - logutil.Logger(ctx).Info("start a Non-transactional delete", zap.String("job", job.String()), zap.Int("totalJobCount", totalJobCount)) - var whereCondition ast.ExprNode if job.start.IsNull() { @@ -255,7 +271,7 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen format.RestoreBracketAroundBinaryOperation| format.RestoreStringWithoutCharset, &sb)) if err != nil { - job.err = err + job.err = errors.Annotate(err, "Failed to restore delete statement") return "" } deleteSQL := sb.String() @@ -265,6 +281,15 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen } job.sql = deleteSQL + logutil.Logger(ctx).Info("start a Non-transactional delete", + zap.String("job", job.String(se.GetSessionVars().EnableRedactLog)), zap.Int("totalJobCount", totalJobCount)) + var deleteSQLInLog string + if se.GetSessionVars().EnableRedactLog { + deleteSQLInLog = parser.Normalize(deleteSQL) + } else { + deleteSQLInLog = deleteSQL + } + options.stmt.DeleteStmt.SetText(nil, fmt.Sprintf("/* job %v/%v */ %s", job.jobID, totalJobCount, deleteSQL)) rs, err := se.ExecuteStmt(ctx, options.stmt.DeleteStmt) @@ -273,13 +298,11 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen err = errors.New("injected split delete error") }) if err != nil { - errStr := fmt.Sprintf("Non-transactional delete SQL failed, sql: %s, error: %s, jobID: %d, jobSize: %d. ", - deleteSQL, err.Error(), job.jobID, job.jobSize) - logutil.Logger(ctx).Error(errStr) + logutil.Logger(ctx).Error("Non-transactional delete SQL failed", zap.String("job", deleteSQLInLog), zap.Error(err), zap.Int("jobID", job.jobID), zap.Int("jobSize", job.jobSize)) job.err = err } else { logutil.Logger(ctx).Info("Non-transactional delete SQL finished successfully", zap.Int("jobID", job.jobID), - zap.Int("jobSize", job.jobSize), zap.String("deleteSQL", deleteSQL)) + zap.Int("jobSize", job.jobSize), zap.String("deleteSQL", deleteSQLInLog)) } if rs != nil { rs.Close() @@ -288,8 +311,7 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen } func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session, - selectSQL string, shardColumnInfo *model.ColumnInfo) ([]job, error) { - logutil.Logger(ctx).Info("Non-transactional delete, select SQL", zap.String("selectSQL", selectSQL)) + selectSQL string, shardColumnInfo *model.ColumnInfo, memTracker *memory.Tracker) ([]job, error) { var shardColumnCollate string if shardColumnInfo != nil { shardColumnCollate = shardColumnInfo.GetCollate() @@ -336,7 +358,7 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s if chk.NumRows() == 0 { if currentSize > 0 { // there's remaining work - jobs = append(jobs, job{jobID: jobCount, start: currentStart, end: currentEnd, jobSize: currentSize}) + jobs = appendNewJob(jobs, jobCount+1, currentStart, currentEnd, currentSize, memTracker) } break } @@ -362,8 +384,8 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s return nil, err } if cmp != 0 { - jobs = append(jobs, job{jobID: jobCount, start: *currentStart.Clone(), end: *currentEnd.Clone(), jobSize: currentSize}) jobCount++ + jobs = appendNewJob(jobs, jobCount, *currentStart.Clone(), *currentEnd.Clone(), currentSize, memTracker) currentSize = 0 currentStart = newEnd } @@ -378,6 +400,12 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s return jobs, nil } +func appendNewJob(jobs []job, id int, start types.Datum, end types.Datum, size int, tracker *memory.Tracker) []job { + jobs = append(jobs, job{jobID: id, start: start, end: end, jobSize: size}) + tracker.Consume(start.EstimatedMemUsage() + end.EstimatedMemUsage() + 64) + return jobs +} + func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.TableName, string, *model.ColumnInfo, error) { // only use the first table tableSource, ok := stmt.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource) @@ -390,7 +418,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl } // the shard column must be indexed - indexed, shardColumnInfo, err := selectShardColumn(stmt, se, tableName) + indexed, shardColumnInfo, err := selectShardColumn(stmt, se, tableName, tableSource.AsName) if err != nil { return nil, "", nil, err } @@ -406,7 +434,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl format.RestoreBracketAroundBinaryOperation| format.RestoreStringWithoutCharset, &sb)) if err != nil { - return nil, "", nil, errors.Trace(err) + return nil, "", nil, errors.Annotate(err, "Failed to restore where clause in non-transactional delete") } } else { sb.WriteString("TRUE") @@ -419,7 +447,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl // it attempts to auto-select a shard column from handle if not specified, and fills back the corresponding info in the stmt, // making it transparent to following steps -func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableName *ast.TableName) (indexed bool, shardColumnInfo *model.ColumnInfo, err error) { +func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableName *ast.TableName, tableAsName model.CIStr) (indexed bool, shardColumnInfo *model.ColumnInfo, err error) { tbl, err := domain.GetDomain(se).InfoSchema().TableByName(tableName.Schema, tableName.Name) if err != nil { return false, nil, err @@ -453,7 +481,7 @@ func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableNa } stmt.ShardColumn = &ast.ColumnName{ Schema: tableName.Schema, - Table: tableName.Name, + Table: tableAsName, // so that table alias works Name: model.NewCIStr(shardColumnName), } return true, shardColumnInfo, nil @@ -519,7 +547,7 @@ func buildDryRunResults(dryRunOption int, results []string, maxChunkSize int) (s }, nil } -func buildExecuteResults(jobs []job, maxChunkSize int) (sqlexec.RecordSet, error) { +func buildExecuteResults(ctx context.Context, jobs []job, maxChunkSize int, redactLog bool) (sqlexec.RecordSet, error) { failedJobs := make([]job, 0) for _, job := range jobs { if job.err != nil { @@ -574,14 +602,19 @@ func buildExecuteResults(jobs []job, maxChunkSize int) (sqlexec.RecordSet, error } rows := make([][]interface{}, 0, len(failedJobs)) + var sb strings.Builder for _, job := range failedJobs { - row := make([]interface{}, 3) - row[0] = job.String() - row[1] = job.sql - row[2] = job.err.Error() + row := make([]interface{}, 2) + row[0] = job.String(false) + row[1] = job.err.Error() rows = append(rows, row) + sb.WriteString(fmt.Sprintf("%s, %s;\n", job.String(redactLog), job.err.Error())) } + // log errors here in case the output is too long. There can be thousands of errors. + logutil.Logger(ctx).Warn("Non-transactional delete failed", + zap.Int("num_failed_jobs", len(failedJobs)), zap.String("failed_jobs", sb.String())) + return &sqlexec.SimpleRecordSet{ ResultFields: resultFields, Rows: rows, diff --git a/session/nontransactional_test.go b/session/nontransactional_test.go index cc83d85594e2a..c588be4e11344 100644 --- a/session/nontransactional_test.go +++ b/session/nontransactional_test.go @@ -100,9 +100,8 @@ func TestNonTransactionalDeleteErrorMessage(t *testing.T) { } failpoint.Enable("github.com/pingcap/tidb/session/splitDeleteError", `return`) defer failpoint.Disable("github.com/pingcap/tidb/session/splitDeleteError") - rows := tk.MustQuery("split on a limit 3 delete from t").Rows() - require.Equal(t, 1, len(rows)) - require.Equal(t, rows[0][2].(string), "Early return: error occurred in the first job. All jobs are canceled: injected split delete error") + err := tk.ExecToErr("split on a limit 3 delete from t") + require.EqualError(t, err, "Early return: error occurred in the first job. All jobs are canceled: injected split delete error") } func TestNonTransactionalDeleteSplitOnTiDBRowID(t *testing.T) { @@ -317,12 +316,16 @@ func TestNonTransactionalDeleteCheckConstraint(t *testing.T) { tk.Session().GetSessionVars().BatchInsert = false tk.Session().GetSessionVars().DMLBatchSize = 0 - tk.MustExec("create table t1(a int, b int, key(a))") - tk.MustExec("insert into t1 values (1, 1)") - err = tk.ExecToErr("split limit 1 delete t, t1 from t, t1 where t.a = t1.a") - require.Error(t, err) + err = tk.ExecToErr("split on a limit 10 delete from t limit 10") + require.EqualError(t, err, "Non-transactional delete doesn't support limit") tk.MustQuery("select count(*) from t").Check(testkit.Rows("100")) - tk.MustQuery("select count(*) from t1").Check(testkit.Rows("1")) + + err = tk.ExecToErr("split on a limit 10 delete from t order by a") + require.EqualError(t, err, "Non-transactional delete doesn't support order by") + tk.MustQuery("select count(*) from t").Check(testkit.Rows("100")) + + err = tk.ExecToErr("prepare nt FROM 'split limit 1 delete from t'") + require.EqualError(t, err, "[executor:1295]This command is not supported in the prepared statement protocol yet") } func TestNonTransactionalDeleteOptimizerHints(t *testing.T) { @@ -337,3 +340,83 @@ func TestNonTransactionalDeleteOptimizerHints(t *testing.T) { result := tk.MustQuery("split on a limit 10 dry run delete /*+ USE_INDEX(t) */ from t").Rows()[0][0].(string) require.Equal(t, result, "DELETE /*+ USE_INDEX(`t` )*/ FROM `test`.`t` WHERE `a` BETWEEN 0 AND 9") } + +func TestNonTransactionalDeleteMultiTables(t *testing.T) { + store, clean := createStorage(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, key(a))") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i, i*2)) + } + + tk.MustExec("create table t1(a int, b int, key(a))") + tk.MustExec("insert into t1 values (1, 1)") + err := tk.ExecToErr("split limit 1 delete t, t1 from t, t1 where t.a = t1.a") + require.Error(t, err) + tk.MustQuery("select count(*) from t").Check(testkit.Rows("100")) + tk.MustQuery("select count(*) from t1").Check(testkit.Rows("1")) +} + +func TestNonTransactionalDeleteAlias(t *testing.T) { + store, clean := createStorage(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + goodSplitStmts := []string{ + "split on test.t1.a limit 5 delete t1.* from test.t as t1", + "split on a limit 5 delete t1.* from test.t as t1", + "split on _tidb_rowid limit 5 delete from test.t as t1", + "split on t1._tidb_rowid limit 5 delete from test.t as t1", + "split on test.t1._tidb_rowid limit 5 delete from test.t as t1", + "split limit 5 delete from test.t as t1", // auto assigns table name to be the alias + } + + badSplitStmts := []string{ + "split on test.t.a limit 5 delete t1.* from test.t as t1", + "split on t.a limit 5 delete t1.* from test.t as t1", + "split on t._tidb_rowid limit 5 delete from test.t as t1", + "split on test.t._tidb_rowid limit 5 delete from test.t as t1", + } + + tk.MustExec("create table test.t(a int, b int, key(a))") + tk.MustExec("create table test.t2(a int, b int, key(a))") + + for _, sql := range goodSplitStmts { + for i := 0; i < 5; i++ { + tk.MustExec(fmt.Sprintf("insert into test.t values (%d, %d)", i, i*2)) + } + tk.MustExec(sql) + tk.MustQuery("select count(*) from test.t").Check(testkit.Rows("0")) + } + + for i := 0; i < 5; i++ { + tk.MustExec(fmt.Sprintf("insert into test.t values (%d, %d)", i, i*2)) + } + for _, sql := range badSplitStmts { + err := tk.ExecToErr(sql) + require.Error(t, err) + tk.MustQuery("select count(*) from test.t").Check(testkit.Rows("5")) + } +} + +func TestNonTransactionalDeleteShardOnUnsupportedTypes(t *testing.T) { + // When some day the test fail because such types are supported, we can update related docs and consider remove the test. + store, clean := createStorage(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a set('e0', 'e1', 'e2'), b int, key(a))") + tk.MustExec("insert into t values ('e2,e0', 3)") + err := tk.ExecToErr("split on a limit 1 delete from t") + require.Error(t, err) + tk.MustQuery("select count(*) from t").Check(testkit.Rows("1")) + + tk.MustExec("create table t2(a enum('e0', 'e1', 'e2'), b int, key(a))") + tk.MustExec("insert into t2 values ('e0', 1)") + err = tk.ExecToErr("split on a limit 1 delete from t2") + require.Error(t, err) + tk.MustQuery("select count(*) from t2").Check(testkit.Rows("1")) +} diff --git a/telemetry/data.go b/telemetry/data.go index 374551df7dd30..ddf574e3bf1ed 100644 --- a/telemetry/data.go +++ b/telemetry/data.go @@ -58,4 +58,5 @@ func postReportTelemetryData() { postReportTxnUsage() postReportCTEUsage() postReportSlowQueryStats() + postReportNonTransactionalCounter() } diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index eba780bfd77aa..96fd9b589dbbd 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -36,13 +36,14 @@ type featureUsage struct { Txn *TxnUsage `json:"txn"` // cluster index usage information // key is the first 6 characters of sha2(TABLE_NAME, 256) - ClusterIndex *ClusterIndexUsage `json:"clusterIndex"` - NewClusterIndex *NewClusterIndexUsage `json:"newClusterIndex"` - TemporaryTable bool `json:"temporaryTable"` - CTE *m.CTEUsageCounter `json:"cte"` - CachedTable bool `json:"cachedTable"` - AutoCapture bool `json:"autoCapture"` - PlacementPolicyUsage *placementPolicyUsage `json:"placementPolicy"` + ClusterIndex *ClusterIndexUsage `json:"clusterIndex"` + NewClusterIndex *NewClusterIndexUsage `json:"newClusterIndex"` + TemporaryTable bool `json:"temporaryTable"` + CTE *m.CTEUsageCounter `json:"cte"` + CachedTable bool `json:"cachedTable"` + AutoCapture bool `json:"autoCapture"` + PlacementPolicyUsage *placementPolicyUsage `json:"placementPolicy"` + NonTransactionalUsage *m.NonTransactionalStmtCounter `json:"nonTransactional"` } type placementPolicyUsage struct { @@ -70,6 +71,9 @@ func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) { usage.AutoCapture = getAutoCaptureUsageInfo(ctx) collectFeatureUsageFromInfoschema(ctx, &usage) + + usage.NonTransactionalUsage = getNonTransactionalUsage() + return &usage, nil } @@ -192,6 +196,7 @@ type TxnUsage struct { var initialTxnCommitCounter metrics.TxnCommitCounter var initialCTECounter m.CTEUsageCounter +var initialNonTransactionalCounter m.NonTransactionalStmtCounter // getTxnUsageInfo gets the usage info of transaction related features. It's exported for tests. func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage { @@ -243,3 +248,13 @@ func getAutoCaptureUsageInfo(ctx sessionctx.Context) bool { } return false } + +func getNonTransactionalUsage() *m.NonTransactionalStmtCounter { + curr := m.GetNonTransactionalStmtCounter() + diff := curr.Sub(initialNonTransactionalCounter) + return &diff +} + +func postReportNonTransactionalCounter() { + initialNonTransactionalCounter = m.GetNonTransactionalStmtCounter() +} diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 7ef9f4ce10b41..d6baf2db6e4ae 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -204,3 +204,21 @@ func TestClusterIndexUsageInfo(t *testing.T) { require.Equal(t, uint64(1), usage.NewClusterIndex.NumClusteredTables) require.Equal(t, uint64(2), usage.NewClusterIndex.NumTotalTables) } + +func TestNonTransactionalUsage(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, int64(0), usage.NonTransactionalUsage.DeleteCount) + + tk.MustExec("create table t(a int);") + tk.MustExec("split limit 1 delete from t") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, int64(1), usage.NonTransactionalUsage.DeleteCount) +} diff --git a/types/datum.go b/types/datum.go index 76a6d4115e1b6..9016774bc58a9 100644 --- a/types/datum.go +++ b/types/datum.go @@ -2402,17 +2402,23 @@ func EstimatedMemUsage(array []Datum, numOfRows int) int64 { if numOfRows == 0 { return 0 } - var bytesConsumed int + var bytesConsumed int64 for _, d := range array { - switch d.Kind() { - case KindMysqlDecimal: - bytesConsumed += sizeOfMyDecimal - case KindMysqlTime: - bytesConsumed += sizeOfMysqlTime - default: - bytesConsumed += len(d.b) - } + bytesConsumed += d.EstimatedMemUsage() + } + return bytesConsumed * int64(numOfRows) +} + +// EstimatedMemUsage returns the estimated bytes consumed of a Datum. +func (d Datum) EstimatedMemUsage() int64 { + bytesConsumed := sizeOfEmptyDatum + switch d.Kind() { + case KindMysqlDecimal: + bytesConsumed += sizeOfMyDecimal + case KindMysqlTime: + bytesConsumed += sizeOfMysqlTime + default: + bytesConsumed += len(d.b) } - bytesConsumed += len(array) * sizeOfEmptyDatum - return int64(bytesConsumed * numOfRows) + return int64(bytesConsumed) } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 39c7d4d5d2fe7..72194243e0803 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -563,4 +563,6 @@ const ( LabelForIndexJoinOuterWorker int = -21 // LabelForBindCache represents the label of the bind cache LabelForBindCache int = -22 + // LabelForNonTransactionalDML represents the label of the non-transactional DML + LabelForNonTransactionalDML = -23 )