Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: produce warnings when fallback from bulk mode #51697

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3818,7 +3818,7 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
future: future,
store: s.store,
txnScope: scope,
pipelined: s.isPipelinedDML(),
pipelined: s.usePipeilnedDMLorWarn(),
})
return nil
}
Expand Down Expand Up @@ -4291,24 +4291,45 @@ func (s *session) NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollect
return indexusage.NewStmtIndexUsageCollector(s.idxUsageCollector)
}

// isPipelinedDML returns the current statement can be executed as a pipelined DML.
func (s *session) isPipelinedDML() bool {
// usePipeilnedDMLorWarn returns the current statement can be executed as a pipelined DML.
func (s *session) usePipeilnedDMLorWarn() bool {
if !s.sessionVars.BulkDMLEnabled {
return false
}
stmtCtx := s.sessionVars.StmtCtx
if stmtCtx == nil {
return false
}
if !stmtCtx.InInsertStmt && !stmtCtx.InDeleteStmt && !stmtCtx.InUpdateStmt {
// not a DML
vars := s.GetSessionVars()
if (vars.BatchCommit || vars.BatchInsert || vars.BatchDelete) && vars.DMLBatchSize > 0 && variable.EnableBatchDML.Load() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used with the deprecated Batch DML. Fallback to standard mode."))
return false
}
if vars.BinlogClient != nil {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode."))
return false
}
if !(stmtCtx.InInsertStmt || stmtCtx.InDeleteStmt || stmtCtx.InUpdateStmt) {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode."))
return false
}
if s.isInternal() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used for internal SQL. Fallback to standard mode."))
return false
}
if vars.InTxn() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used in transaction. Fallback to standard mode."))
return false
}
if !vars.IsAutocommit() {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used in autocommit mode. Fallback to standard mode."))
return false
}
return s.sessionVars.IsAutocommit() && !s.sessionVars.InTxn() &&
!config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() && s.sessionVars.BinlogClient == nil
if config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() {
ekexium marked this conversation as resolved.
Show resolved Hide resolved
stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used in pessimistic autocommit mode. Fallback to standard mode."))
return false
}
return true
ekexium marked this conversation as resolved.
Show resolved Hide resolved
}

// RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver.
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/pipelineddmltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_test(
"//pkg/config",
"//pkg/kv",
"//pkg/sessionctx/binloginfo",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
22 changes: 19 additions & 3 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestVariable(t *testing.T) {
func TestPipelinedDMLPositive(t *testing.T) {
// the test is a little tricky, only when pipelined dml is enabled, the failpoint panics and the panic message will be returned as error
// TODO: maybe save the pipelined DML usage into TxnInfo, so we can check from it.
require.NoError(t, failpoint.Enable("tikvclient/pipelinedCommitFail", `panic("pipelined memdb is be enabled")`))
require.NoError(t, failpoint.Enable("tikvclient/pipelinedCommitFail", `panic("pipelined memdb is enabled")`))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/pipelinedCommitFail"))
}()
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestPipelinedDMLPositive(t *testing.T) {
return err
})
require.Error(t, err, stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is be enabled"), err.Error(), stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error(), stmt)
// binary protocol
ctx := context.Background()
parsedStmts, err := tk.Session().Parse(ctx, stmt)
Expand All @@ -103,7 +104,7 @@ func TestPipelinedDMLPositive(t *testing.T) {
return err
})
require.Error(t, err, stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is be enabled"), err.Error(), stmt)
require.True(t, strings.Contains(err.Error(), "pipelined memdb is enabled"), err.Error(), stmt)
}
}

Expand All @@ -127,6 +128,7 @@ func TestPipelinedDMLNegative(t *testing.T) {
// not in auto-commit txn
tk.MustExec("set session tidb_dml_type = bulk")
tk.MustExec("begin")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode.")
tk.MustExec("insert into t values(2, 2)")
tk.MustExec("commit")

Expand All @@ -137,16 +139,19 @@ func TestPipelinedDMLNegative(t *testing.T) {
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(origPessimisticAutoCommit)
}()
tk.MustExec("insert into t values(3, 3)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used in pessimistic autocommit mode. Fallback to standard mode.")
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(false)

// binlog is enabled
tk.Session().GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&testkit.MockPumpClient{})
tk.MustExec("insert into t values(4, 4)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode.")
tk.Session().GetSessionVars().BinlogClient = nil

// in a running txn
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("begin")
// the warning is produced for the begin stmt.
tk.MustExec("set session tidb_dml_type = bulk") // turn on bulk dml in a txn doesn't effect the current txn.
tk.MustExec("insert into t values(5, 5)")
tk.MustExec("commit")
Expand All @@ -155,9 +160,20 @@ func TestPipelinedDMLNegative(t *testing.T) {
tk.Session().GetSessionVars().InRestrictedSQL = true
tk.MustExec("insert into t values(6, 6)")
tk.Session().GetSessionVars().InRestrictedSQL = false
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used for internal SQL. Fallback to standard mode.")

// it's a read statement
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1 1", "2 2", "3 3", "4 4", "5 5", "6 6"))

// for deprecated batch-dml
tk.Session().GetSessionVars().BatchDelete = true
tk.Session().GetSessionVars().DMLBatchSize = 1
variable.EnableBatchDML.Store(true)
tk.MustExec("insert into t values(7, 7)")
tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with the deprecated Batch DML. Fallback to standard mode.")
tk.Session().GetSessionVars().BatchDelete = false
tk.Session().GetSessionVars().DMLBatchSize = 0
variable.EnableBatchDML.Store(false)
}

func compareTables(t *testing.T, tk *testkit.TestKit, t1, t2 string) {
Expand Down