From 07b3e399ec9929b2bd8c71e3790b83540bc9acc1 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 14 Mar 2024 21:07:45 +0800 Subject: [PATCH] feat: check MDL when starting and committing pipelined txns Signed-off-by: ekexium --- pkg/session/session.go | 11 +++++++++++ .../pipelineddmltest/pipelineddml_test.go | 6 ++++++ 2 files changed, 17 insertions(+) diff --git a/pkg/session/session.go b/pkg/session/session.go index aae8a869f07c9..91c52b74bace2 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -570,6 +570,9 @@ func (s *session) doCommit(ctx context.Context) error { if s.GetSessionVars().TxnCtx != nil { needCheckSchema = !s.GetSessionVars().TxnCtx.EnableMDL } + if s.txn.IsPipelined() && !s.GetSessionVars().TxnCtx.EnableMDL { + return errors.New("cannot commit pipelined transaction without Metadata Lock: MDL is OFF") + } s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs, needCheckSchema)) s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) s.txn.SetOption(kv.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info }) @@ -4303,6 +4306,14 @@ func (s *session) usePipelinedDmlOrWarn() bool { return false } vars := s.GetSessionVars() + if !vars.TxnCtx.EnableMDL { + stmtCtx.AppendWarning( + errors.New( + "Pipelined DML can not be used without Metadata Lock. Fallback to standard mode", + ), + ) + return false + } if (vars.BatchCommit || vars.BatchInsert || vars.BatchDelete) && vars.DMLBatchSize > 0 && variable.EnableBatchDML.Load() { stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used with the deprecated Batch DML. Fallback to standard mode")) return false diff --git a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go index b737f6d5de7f8..ec414e2fe5788 100644 --- a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go +++ b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go @@ -189,6 +189,12 @@ func TestPipelinedDMLNegative(t *testing.T) { tk.MustExec("explain analyze insert into t values(9, 9)") tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used with Binlog: BinlogClient != nil. Fallback to standard mode") tk.Session().GetSessionVars().BinlogClient = nil + + // disable MDL + tk.MustExec("set global tidb_enable_metadata_lock = off") + tk.MustExec("insert into t values(10, 10)") + tk.MustQuery("show warnings").CheckContain("Pipelined DML can not be used without Metadata Lock. Fallback to standard mode") + tk.MustExec("set global tidb_enable_metadata_lock = on") } func compareTables(t *testing.T, tk *testkit.TestKit, t1, t2 string) {