Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: produce warnings when fallback from bulk mode #51697

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ func getLockWaitTime(ctx PlanContext, lockInfo *ast.SelectLockInfo) (lock bool,
// autocommit to 0. If autocommit is enabled, the rows matching the specification are not locked.
// See https://dev.mysql.com/doc/refman/5.7/en/innodb-locking-reads.html
sessVars := ctx.GetSessionVars()
if !sessVars.IsAutocommit() || sessVars.InTxn() || config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() {
if !sessVars.IsAutocommit() || sessVars.InTxn() || (config.GetGlobalConfig().
PessimisticTxn.PessimisticAutoCommit.Load() && !sessVars.BulkDMLEnabled) {
lock = true
waitTime = sessVars.LockWaitTimeout
if lockInfo.LockType == ast.SelectLockForUpdateWaitN {
Expand Down
44 changes: 36 additions & 8 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3782,7 +3782,8 @@ func (s *session) PrepareTxnCtx(ctx context.Context) error {
}

txnMode := ast.Optimistic
if !s.sessionVars.IsAutocommit() || config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() {
if !s.sessionVars.IsAutocommit() || (config.GetGlobalConfig().PessimisticTxn.
PessimisticAutoCommit.Load() && !s.GetSessionVars().BulkDMLEnabled) {
if s.sessionVars.TxnMode == ast.Pessimistic {
txnMode = ast.Pessimistic
}
Expand Down Expand Up @@ -3818,7 +3819,7 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
future: future,
store: s.store,
txnScope: scope,
pipelined: s.isPipelinedDML(),
pipelined: s.usePipelinedDmlOrWarn(),
})
return nil
}
Expand Down Expand Up @@ -4291,24 +4292,51 @@ func (s *session) NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollect
return indexusage.NewStmtIndexUsageCollector(s.idxUsageCollector)
}

// isPipelinedDML returns the current statement can be executed as a pipelined DML.
func (s *session) isPipelinedDML() bool {
// usePipelinedDmlOrWarn returns the current statement can be executed as a pipelined DML.
func (s *session) usePipelinedDmlOrWarn() bool {
if !s.sessionVars.BulkDMLEnabled {
return false
}
stmtCtx := s.sessionVars.StmtCtx
if stmtCtx == nil {
return false
}
if !stmtCtx.InInsertStmt && !stmtCtx.InDeleteStmt && !stmtCtx.InUpdateStmt {
// not a DML
vars := s.GetSessionVars()
if (vars.BatchCommit || vars.BatchInsert || vars.BatchDelete) && vars.DMLBatchSize > 0 && variable.EnableBatchDML.Load() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used with the deprecated Batch DML. Fallback to standard mode"))
return false
}
if vars.BinlogClient != nil {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode"))
return false
}
if !(stmtCtx.InInsertStmt || stmtCtx.InDeleteStmt || stmtCtx.InUpdateStmt) {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode"))
return false
}
if s.isInternal() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used for internal SQL. Fallback to standard mode"))
return false
}
if vars.InTxn() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used in transaction. Fallback to standard mode"))
return false
}
if !vars.IsAutocommit() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used in autocommit mode. Fallback to standard mode"))
return false
}
return s.sessionVars.IsAutocommit() && !s.sessionVars.InTxn() &&
!config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() && s.sessionVars.BinlogClient == nil

// tidb_dml_type=bulk will invalidate the config pessimistic-auto-commit.
// The behavior is as if the config is set to false. But we generate a warning for it.
if config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() {
ekexium marked this conversation as resolved.
Show resolved Hide resolved
stmtCtx.AppendWarning(
errors.New(
"pessimistic-auto-commit config is ignored in favor of Pipelined DML",
),
)
}
return true
ekexium marked this conversation as resolved.
Show resolved Hide resolved
}

// RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver.
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/pipelineddmltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_test(
"//pkg/config",
"//pkg/kv",
"//pkg/sessionctx/binloginfo",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
59 changes: 45 additions & 14 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestVariable(t *testing.T) {
func TestPipelinedDMLPositive(t *testing.T) {
// the test is a little tricky, only when pipelined dml is enabled, the failpoint panics and the panic message will be returned as error
// TODO: maybe save the pipelined DML usage into TxnInfo, so we can check from it.
require.NoError(t, failpoint.Enable("tikvclient/pipelinedCommitFail", `panic("pipelined memdb is be enabled")`))
require.NoError(t, failpoint.Enable("tikvclient/pipelinedCommitFail", `panic("pipelined memdb is enabled")`))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/pipelinedCommitFail"))
}()
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestPipelinedDMLPositive(t *testing.T) {
return err
})
require.Error(t, err, stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is be enabled"), err.Error(), stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error(), stmt)
// binary protocol
ctx := context.Background()
parsedStmts, err := tk.Session().Parse(ctx, stmt)
Expand All @@ -103,8 +104,25 @@ func TestPipelinedDMLPositive(t *testing.T) {
return err
})
require.Error(t, err, stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is be enabled"), err.Error(), stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error(), stmt)
}

// pessimistic-auto-commit is on
origPessimisticAutoCommit := config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load()
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(true)
defer func() {
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(origPessimisticAutoCommit)
}()
err := panicToErr(
func() error {
_, err := tk.Exec("insert into t values(3, 3)")
return err
},
)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error())
tk.MustQuery("show warnings").CheckContain("pessimistic-auto-commit config is ignored in favor of Pipelined DML")
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(false)
}

func TestPipelinedDMLNegative(t *testing.T) {
Expand All @@ -127,37 +145,50 @@ func TestPipelinedDMLNegative(t *testing.T) {
// not in auto-commit txn
tk.MustExec("set session tidb_dml_type = bulk")
tk.MustExec("begin")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode")
you06 marked this conversation as resolved.
Show resolved Hide resolved
tk.MustExec("insert into t values(2, 2)")
tk.MustExec("commit")

// pessimistic-auto-commit is on
origPessimisticAutoCommit := config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load()
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(true)
defer func() {
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(origPessimisticAutoCommit)
}()
tk.MustExec("insert into t values(3, 3)")
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(false)

// binlog is enabled
tk.Session().GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
tk.MustExec("insert into t values(4, 4)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode")
tk.Session().GetSessionVars().BinlogClient = nil

// in a running txn
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("begin")
tk.MustExec("set session tidb_dml_type = bulk") // turn on bulk dml in a txn doesn't effect the current txn.
// turn on bulk dml in a txn doesn't affect the current txn.
tk.MustExec("set session tidb_dml_type = bulk")
tk.MustExec("insert into t values(5, 5)")
tk.MustExec("commit")

// in an internal txn
tk.Session().GetSessionVars().InRestrictedSQL = true
tk.MustExec("insert into t values(6, 6)")
tk.Session().GetSessionVars().InRestrictedSQL = false
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used for internal SQL. Fallback to standard mode")

// it's a read statement
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4", "5 5", "6 6"))
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1 1", "2 2", "4 4", "5 5", "6 6"))

// for deprecated batch-dml
tk.Session().GetSessionVars().BatchDelete = true
tk.Session().GetSessionVars().DMLBatchSize = 1
variable.EnableBatchDML.Store(true)
tk.MustExec("insert into t values(7, 7)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with the deprecated Batch DML. Fallback to standard mode")
tk.Session().GetSessionVars().BatchDelete = false
tk.Session().GetSessionVars().DMLBatchSize = 0
variable.EnableBatchDML.Store(false)

// for explain and explain analyze
tk.Session().GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
tk.MustExec("explain insert into t values(8, 8)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode")
tk.MustExec("explain analyze insert into t values(9, 9)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode")
tk.Session().GetSessionVars().BinlogClient = nil
}

func compareTables(t *testing.T, tk *testkit.TestKit, t1, t2 string) {
Expand Down