Skip to content

Commit

Permalink
*: move dup key error handling to errctx (#50239)
Browse files Browse the repository at this point in the history
close #50238
  • Loading branch information
lcwangchao authored Jan 10, 2024
1 parent 3da5e78 commit 6e10826
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 23 deletions.
3 changes: 3 additions & 0 deletions pkg/errctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ func init() {
errno.ErrNoPartitionForGivenValue,
errno.ErrRowDoesNotMatchGivenPartitionSet,
},
ErrGroupDupKey: {
errno.ErrDupEntry,
},
}

for group, codes := range group2Errors {
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2121,8 +2121,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
// For insert statement (not for update statement), disabling the StrictSQLMode
// should make TruncateAsWarning and DividedByZeroAsWarning,
// but should not make DupKeyAsWarning.
sc.DupKeyAsWarning = stmt.IgnoreErr
if stmt.IgnoreErr {
errLevels[errctx.ErrGroupDupKey] = errctx.LevelWarn
errLevels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelWarn
errLevels[errctx.ErrGroupNoMatchedPartition] = errctx.LevelWarn
}
Expand Down Expand Up @@ -2259,7 +2259,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars *variable.SessionVars) {
sc.InUpdateStmt = true
errLevels := sc.ErrLevels()
sc.DupKeyAsWarning = stmt.IgnoreErr
errLevels[errctx.ErrGroupDupKey] = errctx.ResolveErrLevel(false, stmt.IgnoreErr)
errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !vars.StrictSQLMode || stmt.IgnoreErr)
errLevels[errctx.ErrGroupDividedByZero] = errctx.ResolveErrLevel(
!vars.SQLMode.HasErrorForDivisionByZeroMode(),
Expand All @@ -2279,7 +2279,7 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars
func ResetDeleteStmtCtx(sc *stmtctx.StatementContext, stmt *ast.DeleteStmt, vars *variable.SessionVars) {
sc.InDeleteStmt = true
errLevels := sc.ErrLevels()
sc.DupKeyAsWarning = stmt.IgnoreErr
errLevels[errctx.ErrGroupDupKey] = errctx.ResolveErrLevel(false, stmt.IgnoreErr)
errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !vars.StrictSQLMode || stmt.IgnoreErr)
errLevels[errctx.ErrGroupDividedByZero] = errctx.ResolveErrLevel(
!vars.SQLMode.HasErrorForDivisionByZeroMode(),
Expand Down
10 changes: 10 additions & 0 deletions pkg/executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.InsertStmt{}, &ast.UpdateStmt{}, &ast.DeleteStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelError
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelError
l[errctx.ErrGroupDividedByZero] = errctx.LevelError
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -298,6 +299,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.InsertStmt{}, &ast.UpdateStmt{}, &ast.DeleteStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelWarn
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelWarn
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -311,6 +313,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.InsertStmt{IgnoreErr: true}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelWarn
l[errctx.ErrGroupDupKey] = errctx.LevelWarn
l[errctx.ErrGroupBadNull] = errctx.LevelWarn
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelWarn
Expand All @@ -324,6 +327,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.UpdateStmt{IgnoreErr: true}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelWarn
l[errctx.ErrGroupDupKey] = errctx.LevelWarn
l[errctx.ErrGroupBadNull] = errctx.LevelWarn
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -337,6 +341,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.DeleteStmt{IgnoreErr: true}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelWarn
l[errctx.ErrGroupDupKey] = errctx.LevelWarn
l[errctx.ErrGroupBadNull] = errctx.LevelWarn
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -350,6 +355,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.InsertStmt{}, &ast.UpdateStmt{}, &ast.DeleteStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelError
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelError
l[errctx.ErrGroupDividedByZero] = errctx.LevelIgnore
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -363,6 +369,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.SelectStmt{}, &ast.SetOprStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelWarn
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelError
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -376,6 +383,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.SelectStmt{}, &ast.SetOprStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelWarn
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelError
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -389,6 +397,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.LoadDataStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelError
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelError
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand All @@ -402,6 +411,7 @@ func TestErrLevelsForResetStmtContext(t *testing.T) {
stmt: []ast.StmtNode{&ast.LoadDataStmt{}},
levels: func() (l errctx.LevelMap) {
l[errctx.ErrGroupTruncate] = errctx.LevelError
l[errctx.ErrGroupDupKey] = errctx.LevelError
l[errctx.ErrGroupBadNull] = errctx.LevelError
l[errctx.ErrGroupDividedByZero] = errctx.LevelWarn
l[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelError
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
sessVars := e.Ctx().GetSessionVars()
defer sessVars.CleanBuffers()
ignoreErr := sessVars.StmtCtx.DupKeyAsWarning
ignoreErr := sessVars.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError

txn, err := e.Ctx().Txn(true)
if err != nil {
Expand Down Expand Up @@ -199,10 +200,9 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch)
if e.Ctx().GetSessionVars().StmtCtx.DupKeyAsWarning && (kv.ErrKeyExists.Equal(err) ||
table.ErrCheckConstraintViolated.Equal(err)) {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err)
return nil
if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err)
}
return err
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (e *InsertExec) setMessage() {
if e.SelectExec != nil || numRecords > 1 {
numWarnings := stmtCtx.WarningCount()
var numDuplicates uint64
if stmtCtx.DupKeyAsWarning {
if stmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError {
// if ignoreErr
numDuplicates = numRecords - stmtCtx.CopiedRows()
} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,11 @@ func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int
err = completeInsertErr(c, val, rowIdx, err)
}

if !e.Ctx().GetSessionVars().StmtCtx.DupKeyAsWarning {
return err
}
// TODO: should not filter all types of errors here.
e.handleWarning(err)
if err != nil {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ type LoadDataWorker struct {
func setNonRestrictiveFlags(stmtCtx *stmtctx.StatementContext) {
// TODO: DupKeyAsWarning represents too many "ignore error" paths, the
// meaning of this flag is not clear. I can only reuse it here.
stmtCtx.DupKeyAsWarning = true
levels := stmtCtx.ErrLevels()
levels[errctx.ErrGroupDupKey] = errctx.LevelWarn
levels[errctx.ErrGroupBadNull] = errctx.LevelWarn
stmtCtx.SetErrLevels(levels)
stmtCtx.SetTypeFlags(stmtCtx.TypeFlags().WithTruncateAsWarning(true))
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/writetest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestIssue18681(t *testing.T) {
deleteSQL := "delete from load_data_test"
selectSQL := "select bin(a), bin(b), bin(c), bin(d) from load_data_test;"
levels := ctx.GetSessionVars().StmtCtx.ErrLevels()
ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true
levels[errctx.ErrGroupDupKey] = errctx.LevelWarn
levels[errctx.ErrGroupBadNull] = errctx.LevelWarn

sc := ctx.GetSessionVars().StmtCtx
Expand Down
8 changes: 5 additions & 3 deletions pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat
continue
}

sc := e.Ctx().GetSessionVars().StmtCtx
if (kv.ErrKeyExists.Equal(err1) || table.ErrCheckConstraintViolated.Equal(err1)) && sc.DupKeyAsWarning {
sc.AppendWarning(err1)
if kv.ErrKeyExists.Equal(err1) || table.ErrCheckConstraintViolated.Equal(err1) {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
if err1 = ec.HandleErrorWithAlias(kv.ErrKeyExists, err1, err1); err1 != nil {
return err1
}
continue
}
return err1
Expand Down
9 changes: 6 additions & 3 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ type StatementContext struct {
InCreateOrAlterStmt bool
InSetSessionStatesStmt bool
InPreparedPlanBuilding bool
DupKeyAsWarning bool
InShowWarning bool
UseCache bool
ForcePlanCache bool // force the optimizer to use plan cache even if there is risky optimization, see #49736.
Expand Down Expand Up @@ -497,8 +496,12 @@ func (sc *StatementContext) SetErrLevels(otherLevels errctx.LevelMap) {

// ErrLevels returns the current `errctx.LevelMap`
func (sc *StatementContext) ErrLevels() errctx.LevelMap {
ec := sc.ErrCtx()
return ec.LevelMap()
return sc.errCtx.LevelMap()
}

// ErrGroupLevel returns the error level for the given error group
func (sc *StatementContext) ErrGroupLevel(group errctx.ErrGroup) errctx.Level {
return sc.errCtx.LevelForGroup(group)
}

// TypeFlags returns the type flags
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
deps = [
"//pkg/config",
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/errno",
"//pkg/keyspace",
"//pkg/kv",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
Expand Down Expand Up @@ -2606,7 +2607,7 @@ func (s *SessionVars) GetPrevStmtDigest() string {

// LazyCheckKeyNotExists returns if we can lazy check key not exists.
func (s *SessionVars) LazyCheckKeyNotExists() bool {
return s.PresumeKeyNotExists || (s.TxnCtx != nil && s.TxnCtx.IsPessimistic && !s.StmtCtx.DupKeyAsWarning)
return s.PresumeKeyNotExists || (s.TxnCtx != nil && s.TxnCtx.IsPessimistic && s.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) == errctx.LevelError)
}

// GetTemporaryTable returns a TempTable by tableInfo.
Expand Down
2 changes: 1 addition & 1 deletion pkg/table/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func handleZeroDatetime(ctx sessionctx.Context, col *model.ColumnInfo, casted ty
// if NO_ZERO_IN_DATE is enabled, dates with zero parts are inserted as '0000-00-00' and produce a warning
// If NO_ZERO_IN_DATE mode and strict mode are enabled, dates with zero parts are not permitted and inserts produce an error, unless IGNORE is given as well. For INSERT IGNORE and UPDATE IGNORE, dates with zero parts are inserted as '0000-00-00' and produce a warning.

ignoreErr := sc.DupKeyAsWarning
ignoreErr := sc.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError

// Timestamp in MySQL is since EPOCH 1970-01-01 00:00:00 UTC and can by definition not have invalid dates!
// Zero date is special for MySQL timestamp and *NOT* 1970-01-01 00:00:00, but 0000-00-00 00:00:00!
Expand Down

0 comments on commit 6e10826

Please sign in to comment.