Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: refine non-transactional delete #34273

Merged
merged 12 commits into from
May 9, 2022
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
@@ -165,7 +165,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

switch stmt.(type) {
case *ast.LoadDataStmt, *ast.PrepareStmt, *ast.ExecuteStmt, *ast.DeallocateStmt:
case *ast.LoadDataStmt, *ast.PrepareStmt, *ast.ExecuteStmt, *ast.DeallocateStmt, *ast.NonTransactionalDeleteStmt:
return ErrUnsupportedPs
}

1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -165,6 +165,7 @@ func RegisterMetrics() {
prometheus.MustRegister(CPUProfileCounter)
prometheus.MustRegister(ReadFromTableCacheCounter)
prometheus.MustRegister(LoadTableCacheDurationHistogram)
prometheus.MustRegister(NonTransactionalDeleteCount)

tikvmetrics.InitMetrics(TiDB, TiKVClient)
tikvmetrics.RegisterMetrics()
8 changes: 8 additions & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
@@ -127,6 +127,14 @@ var (
Name: "validate_read_ts_from_pd_count",
Help: "Counter of validating read ts by getting a timestamp from PD",
})

NonTransactionalDeleteCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "non_transactional_delete_count",
Help: "Counter of non-transactional delete",
})
)

// Label constants.
19 changes: 19 additions & 0 deletions metrics/telemetry.go
Original file line number Diff line number Diff line change
@@ -67,3 +67,22 @@ func GetCTECounter() CTEUsageCounter {
NonCTEUsed: readCounter(TelemetrySQLCTECnt.With(prometheus.Labels{LblCTEType: "notCTE"})),
}
}

// NonTransactionalStmtCounter records the usages of non-transactional statements.
type NonTransactionalStmtCounter struct {
DeleteCount int64 `json:"delete"`
}

// Sub returns the difference of two counters.
func (n NonTransactionalStmtCounter) Sub(rhs NonTransactionalStmtCounter) NonTransactionalStmtCounter {
return NonTransactionalStmtCounter{
DeleteCount: n.DeleteCount - rhs.DeleteCount,
}
}

// GetNonTransactionalStmtCounter gets the NonTransactionalStmtCounter.
func GetNonTransactionalStmtCounter() NonTransactionalStmtCounter {
return NonTransactionalStmtCounter{
DeleteCount: readCounter(NonTransactionalDeleteCount),
}
}
95 changes: 64 additions & 31 deletions session/nontransactional.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,9 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/model"
@@ -35,6 +38,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
@@ -45,7 +49,7 @@ type job struct {
end types.Datum
err error
jobID int
jobSize int
jobSize int // it can be inaccurate if there are concurrent writes
sql string
}

@@ -57,8 +61,11 @@ type statementBuildInfo struct {
originalCondition ast.ExprNode
}

func (j job) String() string {
return fmt.Sprintf("job id: %d, job size: %d, range: [%s, %s]", j.jobID, j.jobSize, j.start.String(), j.end.String())
func (j job) String(redacted bool) string {
if redacted {
return fmt.Sprintf("job id: %d, estimated size: %d", j.jobID, j.jobSize)
}
return fmt.Sprintf("job id: %d, estimated size: %d, sql: %s", j.jobID, j.jobSize, j.sql)
}

// HandleNonTransactionalDelete is the entry point for a non-transactional delete
@@ -70,14 +77,21 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona
if err := checkConstraint(ctx, stmt, se); err != nil {
return nil, err
}
metrics.NonTransactionalDeleteCount.Inc()
tableName, selectSQL, shardColumnInfo, err := buildSelectSQL(stmt, se)
if err != nil {
return nil, err
}
if stmt.DryRun == ast.DryRunQuery {
return buildDryRunResults(stmt.DryRun, []string{selectSQL}, se.GetSessionVars().BatchSize.MaxChunkSize)
}
jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo)

// TODO: choose an appropriate quota.
// Use the mem-quota-query as a workaround. As a result, a NT-DML may consume 2x of the memory quota.
memTracker := memory.NewTracker(memory.LabelForNonTransactionalDML, se.GetSessionVars().MemQuotaQuery)
memTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker)
defer memTracker.DetachFromGlobalTracker()
jobs, err := buildShardJobs(ctx, stmt, se, selectSQL, shardColumnInfo, memTracker)
if err != nil {
return nil, err
}
@@ -89,7 +103,7 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona
if stmt.DryRun == ast.DryRunSplitDml {
return buildDryRunResults(stmt.DryRun, splitStmts, se.GetSessionVars().BatchSize.MaxChunkSize)
}
return buildExecuteResults(jobs, se.GetSessionVars().BatchSize.MaxChunkSize)
return buildExecuteResults(ctx, jobs, se.GetSessionVars().BatchSize.MaxChunkSize, se.GetSessionVars().EnableRedactLog)
}

func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session) error {
@@ -108,13 +122,19 @@ func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt,
if sessVars.SnapshotTS != 0 {
return errors.New("can't do non-transactional DML when tidb_snapshot is set")
}
// TODO: return error if there are multiple tables
if stmt.DeleteStmt.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs == nil {

if stmt.DeleteStmt.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs == nil || stmt.DeleteStmt.TableRefs.TableRefs.Left == nil {
return errors.New("table reference is nil")
}
if stmt.DeleteStmt.TableRefs.TableRefs.Right != nil {
return errors.New("Non-transactional delete doesn't support multiple tables")
}
if stmt.DeleteStmt.Limit != nil {
return errors.New("Non-transactional delete doesn't support limit")
}
if stmt.DeleteStmt.Order != nil {
return errors.New("Non-transactional delete doesn't support order by")
}
return nil
}

@@ -145,7 +165,7 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction
failedJobs := make([]string, 0)
for _, job := range jobs {
if job.err != nil {
failedJobs = append(failedJobs, fmt.Sprintf("job:%s, error: %s", job.String(), job.err.Error()))
failedJobs = append(failedJobs, fmt.Sprintf("job:%s, error: %s", job.String(se.GetSessionVars().EnableRedactLog), job.err.Error()))
}
}
if len(failedJobs) == 0 {
@@ -177,17 +197,13 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction

// if the first job failed, there is a large chance that all jobs will fail. So return early.
if i == 0 && jobs[i].err != nil {
jobs[i].err = errors.Wrap(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled")
logutil.Logger(ctx).Error("Non-transactional delete, early return", zap.Error(jobs[i].err))
break
return nil, errors.Annotate(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled")
}
}
return splitStmts, nil
}

func doOneJob(ctx context.Context, job *job, totalJobCount int, options statementBuildInfo, se Session, dryRun bool) string {
logutil.Logger(ctx).Info("start a Non-transactional delete", zap.String("job", job.String()), zap.Int("totalJobCount", totalJobCount))

var whereCondition ast.ExprNode

if job.start.IsNull() {
@@ -255,7 +271,7 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
format.RestoreBracketAroundBinaryOperation|
format.RestoreStringWithoutCharset, &sb))
if err != nil {
job.err = err
job.err = errors.Annotate(err, "Failed to restore delete statement")
return ""
}
deleteSQL := sb.String()
@@ -265,6 +281,15 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
}

job.sql = deleteSQL
logutil.Logger(ctx).Info("start a Non-transactional delete",
zap.String("job", job.String(se.GetSessionVars().EnableRedactLog)), zap.Int("totalJobCount", totalJobCount))
var deleteSQLInLog string
if se.GetSessionVars().EnableRedactLog {
deleteSQLInLog = parser.Normalize(deleteSQL)
} else {
deleteSQLInLog = deleteSQL
}

options.stmt.DeleteStmt.SetText(nil, fmt.Sprintf("/* job %v/%v */ %s", job.jobID, totalJobCount, deleteSQL))
rs, err := se.ExecuteStmt(ctx, options.stmt.DeleteStmt)

@@ -273,13 +298,11 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
err = errors.New("injected split delete error")
})
if err != nil {
errStr := fmt.Sprintf("Non-transactional delete SQL failed, sql: %s, error: %s, jobID: %d, jobSize: %d. ",
deleteSQL, err.Error(), job.jobID, job.jobSize)
logutil.Logger(ctx).Error(errStr)
logutil.Logger(ctx).Error("Non-transactional delete SQL failed", zap.String("job", deleteSQLInLog), zap.Error(err), zap.Int("jobID", job.jobID), zap.Int("jobSize", job.jobSize))
job.err = err
} else {
logutil.Logger(ctx).Info("Non-transactional delete SQL finished successfully", zap.Int("jobID", job.jobID),
zap.Int("jobSize", job.jobSize), zap.String("deleteSQL", deleteSQL))
zap.Int("jobSize", job.jobSize), zap.String("deleteSQL", deleteSQLInLog))
}
if rs != nil {
rs.Close()
@@ -288,8 +311,7 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
}

func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session,
selectSQL string, shardColumnInfo *model.ColumnInfo) ([]job, error) {
logutil.Logger(ctx).Info("Non-transactional delete, select SQL", zap.String("selectSQL", selectSQL))
selectSQL string, shardColumnInfo *model.ColumnInfo, memTracker *memory.Tracker) ([]job, error) {
var shardColumnCollate string
if shardColumnInfo != nil {
shardColumnCollate = shardColumnInfo.GetCollate()
@@ -336,7 +358,7 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s
if chk.NumRows() == 0 {
if currentSize > 0 {
// there's remaining work
jobs = append(jobs, job{jobID: jobCount, start: currentStart, end: currentEnd, jobSize: currentSize})
jobs = appendNewJob(jobs, jobCount+1, currentStart, currentEnd, currentSize, memTracker)
ekexium marked this conversation as resolved.
Show resolved Hide resolved
}
break
}
@@ -362,8 +384,8 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s
return nil, err
}
if cmp != 0 {
jobs = append(jobs, job{jobID: jobCount, start: *currentStart.Clone(), end: *currentEnd.Clone(), jobSize: currentSize})
jobCount++
jobs = appendNewJob(jobs, jobCount, *currentStart.Clone(), *currentEnd.Clone(), currentSize, memTracker)
currentSize = 0
currentStart = newEnd
}
@@ -378,6 +400,12 @@ func buildShardJobs(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, s
return jobs, nil
}

func appendNewJob(jobs []job, id int, start types.Datum, end types.Datum, size int, tracker *memory.Tracker) []job {
jobs = append(jobs, job{jobID: id, start: start, end: end, jobSize: size})
tracker.Consume(start.EstimatedMemUsage() + end.EstimatedMemUsage() + 64)
return jobs
}

func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.TableName, string, *model.ColumnInfo, error) {
// only use the first table
tableSource, ok := stmt.DeleteStmt.TableRefs.TableRefs.Left.(*ast.TableSource)
@@ -390,7 +418,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl
}

// the shard column must be indexed
indexed, shardColumnInfo, err := selectShardColumn(stmt, se, tableName)
indexed, shardColumnInfo, err := selectShardColumn(stmt, se, tableName, tableSource.AsName)
if err != nil {
return nil, "", nil, err
}
@@ -406,7 +434,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl
format.RestoreBracketAroundBinaryOperation|
format.RestoreStringWithoutCharset, &sb))
if err != nil {
return nil, "", nil, errors.Trace(err)
return nil, "", nil, errors.Annotate(err, "Failed to restore where clause in non-transactional delete")
}
} else {
sb.WriteString("TRUE")
@@ -419,7 +447,7 @@ func buildSelectSQL(stmt *ast.NonTransactionalDeleteStmt, se Session) (*ast.Tabl

// it attempts to auto-select a shard column from handle if not specified, and fills back the corresponding info in the stmt,
// making it transparent to following steps
func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableName *ast.TableName) (indexed bool, shardColumnInfo *model.ColumnInfo, err error) {
func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableName *ast.TableName, tableAsName model.CIStr) (indexed bool, shardColumnInfo *model.ColumnInfo, err error) {
tbl, err := domain.GetDomain(se).InfoSchema().TableByName(tableName.Schema, tableName.Name)
if err != nil {
return false, nil, err
@@ -453,7 +481,7 @@ func selectShardColumn(stmt *ast.NonTransactionalDeleteStmt, se Session, tableNa
}
stmt.ShardColumn = &ast.ColumnName{
Schema: tableName.Schema,
Table: tableName.Name,
Table: tableAsName, // so that table alias works
Name: model.NewCIStr(shardColumnName),
}
return true, shardColumnInfo, nil
@@ -519,7 +547,7 @@ func buildDryRunResults(dryRunOption int, results []string, maxChunkSize int) (s
}, nil
}

func buildExecuteResults(jobs []job, maxChunkSize int) (sqlexec.RecordSet, error) {
func buildExecuteResults(ctx context.Context, jobs []job, maxChunkSize int, redactLog bool) (sqlexec.RecordSet, error) {
failedJobs := make([]job, 0)
for _, job := range jobs {
if job.err != nil {
@@ -574,14 +602,19 @@ func buildExecuteResults(jobs []job, maxChunkSize int) (sqlexec.RecordSet, error
}

rows := make([][]interface{}, 0, len(failedJobs))
var sb strings.Builder
for _, job := range failedJobs {
row := make([]interface{}, 3)
row[0] = job.String()
row[1] = job.sql
row[2] = job.err.Error()
row := make([]interface{}, 2)
row[0] = job.String(false)
row[1] = job.err.Error()
rows = append(rows, row)
sb.WriteString(fmt.Sprintf("%s, %s;\n", job.String(redactLog), job.err.Error()))
}

// log errors here in case the output is too long. There can be thousands of errors.
logutil.Logger(ctx).Warn("Non-transactional delete failed",
zap.Int("num_failed_jobs", len(failedJobs)), zap.String("failed_jobs", sb.String()))

return &sqlexec.SimpleRecordSet{
ResultFields: resultFields,
Rows: rows,
Loading