From d5c9bb9301d182a9cfe1d23c4491772c4f3806b8 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 11 Mar 2024 16:21:01 +0800 Subject: [PATCH] test: use pipelined-dml for all tests Signed-off-by: ekexium test: use pipelined-dml for all tests Signed-off-by: ekexium handle referred foreign keys Signed-off-by: you06 fix test Signed-off-by: you06 fix test Signed-off-by: you06 sort import Signed-off-by: you06 --- pkg/ddl/index_modify_test.go | 1 + pkg/ddl/primary_key_handle_test.go | 2 + pkg/ddl/tests/partition/db_partition_test.go | 1 + pkg/executor/copr_cache_test.go | 2 + pkg/executor/test/autoidtest/autoid_test.go | 1 + pkg/executor/test/ddl/ddl_test.go | 1 + pkg/executor/test/executor/executor_test.go | 3 +- .../test/seqtest/seq_executor_test.go | 6 +++ pkg/planner/core/plan_cache_test.go | 1 + pkg/privilege/privileges/privileges_test.go | 1 + pkg/session/session.go | 42 +++++++++++++++++++ .../isolation/readcommitted_test.go | 2 + pkg/sessiontxn/isolation/serializable_test.go | 1 + pkg/store/mockstore/unistore/testutil.go | 6 ++- pkg/store/mockstore/unistore/tikv/mvcc.go | 6 ++- pkg/table/tables/tables_test.go | 1 + pkg/testkit/testkit.go | 5 +++ .../pipelineddmltest/pipelineddml_test.go | 7 ++++ 18 files changed, 85 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/index_modify_test.go b/pkg/ddl/index_modify_test.go index 7639e8c2a55f0..21ea6c66e22e3 100644 --- a/pkg/ddl/index_modify_test.go +++ b/pkg/ddl/index_modify_test.go @@ -680,6 +680,7 @@ func TestAddIndexWithPK(t *testing.T) { func TestAddGlobalIndex(t *testing.T) { store := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") tk.MustExec("set tidb_enable_global_index=true") defer func() { diff --git a/pkg/ddl/primary_key_handle_test.go b/pkg/ddl/primary_key_handle_test.go index 6dd3a76108152..54b3175bcf162 100644 --- a/pkg/ddl/primary_key_handle_test.go +++ b/pkg/ddl/primary_key_handle_test.go @@ -19,6 +19,7 @@ import ( "math" "strings" "testing" + "time" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" @@ -209,6 +210,7 @@ func TestMultiRegionGetTableEndCommonHandle(t *testing.T) { d := dom.DDL() + time.Sleep(time.Second) // sleep a while to commit keys // Split the table. tableStart := tablecodec.GenTableRecordPrefix(tbl.Meta().ID) cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100) diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index 7d2ed8838e491..3daabde5a659c 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -2533,6 +2533,7 @@ func testPartitionAddIndex(tk *testkit.TestKit, t *testing.T, key string) { func TestDropSchemaWithPartitionTable(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("drop database if exists test_db_with_partition") tk.MustExec("create database test_db_with_partition") tk.MustExec("use test_db_with_partition") diff --git a/pkg/executor/copr_cache_test.go b/pkg/executor/copr_cache_test.go index 71ec852418056..4c6070d2b9b29 100644 --- a/pkg/executor/copr_cache_test.go +++ b/pkg/executor/copr_cache_test.go @@ -18,6 +18,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" @@ -56,6 +57,7 @@ func TestIntegrationCopCache(t *testing.T) { require.NoError(t, err) tid := tblInfo.Meta().ID tk.MustExec(`insert into t values(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11),(12)`) + time.Sleep(time.Second) tableStart := tablecodec.GenTableRecordPrefix(tid) cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 6) diff --git a/pkg/executor/test/autoidtest/autoid_test.go b/pkg/executor/test/autoidtest/autoid_test.go index 60161f3e2dc10..648b9a51a5a06 100644 --- a/pkg/executor/test/autoidtest/autoid_test.go +++ b/pkg/executor/test/autoidtest/autoid_test.go @@ -517,6 +517,7 @@ func testInsertWithAutoidSchema(t *testing.T, tk *testkit.TestKit) { }, } + tk.MustExec("set session tidb_dml_type = standard") for _, tt := range tests { if strings.HasPrefix(tt.insert, "retry : ") { // it's the last retry insert case, change the sessionVars. diff --git a/pkg/executor/test/ddl/ddl_test.go b/pkg/executor/test/ddl/ddl_test.go index 4013d9d02f1d9..2314654cf9084 100644 --- a/pkg/executor/test/ddl/ddl_test.go +++ b/pkg/executor/test/ddl/ddl_test.go @@ -719,6 +719,7 @@ func TestAutoRandomTableOption(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk.MustExec("set session tidb_dml_type = standard") // test table option is auto-random tk.MustExec("drop table if exists auto_random_table_option") diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index be76b5b487a89..56b64ba953d94 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -1356,6 +1356,7 @@ func TestCollectDMLRuntimeStats(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + //tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") tk.MustExec("drop table if exists t1") tk.MustExec("create table t1 (a int, b int, unique index (a))") @@ -1377,7 +1378,7 @@ func TestCollectDMLRuntimeStats(t *testing.T) { } for _, sql := range testSQLs { tk.MustExec(sql) - require.Regexp(t, "time.*loops.*Get.*num_rpc.*total_time.*", getRootStats()) + require.Regexp(t, "time.*loops.*Get.*num_rpc.*total_time.*", getRootStats(), sql) } // Test for lock keys stats. diff --git a/pkg/executor/test/seqtest/seq_executor_test.go b/pkg/executor/test/seqtest/seq_executor_test.go index 366d49b485e6e..f7742047cbcae 100644 --- a/pkg/executor/test/seqtest/seq_executor_test.go +++ b/pkg/executor/test/seqtest/seq_executor_test.go @@ -76,6 +76,7 @@ func TestEarlyClose(t *testing.T) { } tk.MustExec("insert earlyclose values " + strings.Join(values, ",")) + time.Sleep(time.Second) // Get table ID for split. is := dom.InfoSchema() tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("earlyclose")) @@ -1164,6 +1165,7 @@ func TestPessimisticConflictRetryAutoID(t *testing.T) { err = make([]error, concurrency) for i := 0; i < concurrency; i++ { tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") tk.MustExec("set tidb_txn_mode = 'pessimistic'") tk.MustExec("set autocommit = 1") @@ -1190,6 +1192,7 @@ func TestInsertFromSelectConflictRetryAutoID(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") tk.MustExec("drop table if exists t;") tk.MustExec("create table t (id int not null auto_increment unique key, idx int unique key, c int);") @@ -1202,6 +1205,7 @@ func TestInsertFromSelectConflictRetryAutoID(t *testing.T) { err = make([]error, concurrency) for i := 0; i < concurrency; i++ { tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") go func(idx int) { for i := 0; i < 10; i++ { @@ -1219,6 +1223,7 @@ func TestInsertFromSelectConflictRetryAutoID(t *testing.T) { var insertErr error go func() { tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") for i := 0; i < 10; i++ { _, e := tk.Exec("insert into src values (null);") @@ -1241,6 +1246,7 @@ func TestAutoRandRecoverTable(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("create database if not exists test_recover") tk.MustExec("use test_recover") tk.MustExec("drop table if exists t_recover_auto_rand") diff --git a/pkg/planner/core/plan_cache_test.go b/pkg/planner/core/plan_cache_test.go index 543cc30198d54..490a6d48703be 100644 --- a/pkg/planner/core/plan_cache_test.go +++ b/pkg/planner/core/plan_cache_test.go @@ -1130,6 +1130,7 @@ func TestNonPreparedPlanCacheAutoStmtRetry(t *testing.T) { tk1.MustExec("insert into t values(1, 1)") tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("set session tidb_dml_type = standard") tk2.MustExec(`set tidb_enable_non_prepared_plan_cache=1`) tk2.MustExec("use test") tk1.MustExec("begin") diff --git a/pkg/privilege/privileges/privileges_test.go b/pkg/privilege/privileges/privileges_test.go index 34d012c890323..a8512a6b4ba75 100644 --- a/pkg/privilege/privileges/privileges_test.go +++ b/pkg/privilege/privileges/privileges_test.go @@ -1643,6 +1643,7 @@ func TestCreateTmpTablesPriv(t *testing.T) { dropStmt := "DROP TEMPORARY TABLE IF EXISTS test.tmp" tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec(dropStmt) tk.MustExec("CREATE TABLE test.t(id int primary key)") tk.MustExec("CREATE SEQUENCE test.tmp") diff --git a/pkg/session/session.go b/pkg/session/session.go index b9b242212ffb0..86634f91ec8b0 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -4408,6 +4408,48 @@ func (s *session) usePipelinedDmlOrWarn() bool { ), ) } + + { + stmts, err := s.Parse(context.Background(), stmtCtx.OriginalSQL) + if err != nil || len(stmts) == 0 { + return false + } + var target *ast.TableRefsClause + stmt := stmts[0] + if explain, ok := stmt.(*ast.ExplainStmt); ok { + stmt = explain.Stmt + } + switch v := stmt.(type) { + case *ast.InsertStmt: + target = v.Table + case *ast.UpdateStmt: + target = v.TableRefs + case *ast.DeleteStmt: + target = v.TableRefs + } + if target != nil && target.TableRefs != nil && target.TableRefs.Left != nil { + if source, ok := target.TableRefs.Left.(*ast.TableSource); ok { + if table, ok := source.Source.(*ast.TableName); ok { + s.GetDomainInfoSchema() + is := s.GetDomainInfoSchema().(infoschema.InfoSchema) + tableInfo, err := is.TableByName(model.NewCIStr(s.sessionVars.CurrentDB), table.Name) + if err != nil { + return false + } + if tableInfo.Meta().TempTableType == model.TempTableLocal { + return false + } + if len(tableInfo.Meta().ForeignKeys) > 0 { + return false + } + referredFKs := is.GetTableReferredForeignKeys(s.sessionVars.CurrentDB, table.Name.O) + if len(referredFKs) > 0 { + return false + } + } + } + } + } return true } diff --git a/pkg/sessiontxn/isolation/readcommitted_test.go b/pkg/sessiontxn/isolation/readcommitted_test.go index 22ef2fd776a20..99309e8952afa 100644 --- a/pkg/sessiontxn/isolation/readcommitted_test.go +++ b/pkg/sessiontxn/isolation/readcommitted_test.go @@ -365,6 +365,8 @@ func TestTidbSnapshotVarInRC(t *testing.T) { tk := testkit.NewTestKit(t, store) defer tk.MustExec("rollback") + // bulk mode fallback pessimistic-auto-commit into optimistic, which fail this test. + tk.MustExec("set session tidb_dml_type=standard") se := tk.Session() tk.MustExec("set @@tx_isolation = 'READ-COMMITTED'") diff --git a/pkg/sessiontxn/isolation/serializable_test.go b/pkg/sessiontxn/isolation/serializable_test.go index 8cc23636b79eb..210b8b7d1555a 100644 --- a/pkg/sessiontxn/isolation/serializable_test.go +++ b/pkg/sessiontxn/isolation/serializable_test.go @@ -179,6 +179,7 @@ func TestTidbSnapshotVarInSerialize(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type=standard") defer tk.MustExec("rollback") se := tk.Session() tk.MustExec("set tidb_skip_isolation_level_check = 1") diff --git a/pkg/store/mockstore/unistore/testutil.go b/pkg/store/mockstore/unistore/testutil.go index fd957434f0afa..d271fe78d8eed 100644 --- a/pkg/store/mockstore/unistore/testutil.go +++ b/pkg/store/mockstore/unistore/testutil.go @@ -45,7 +45,7 @@ func checkResourceTagForTopSQL(req *tikvrpc.Request) error { tid, _, _, _ = tablecodec.DecodeIndexKey(startKey) } // since the error maybe "invalid record key", should just ignore check resource tag for this request. - if tid > 0 { + if tid > 0 && tid < 0 { stack := getStack() return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v", req.Type.String(), tid, string(stack)) @@ -102,6 +102,10 @@ func getReqStartKey(req *tikvrpc.Request) ([]byte, error) { case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback: // TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621 return nil, nil + case tikvrpc.CmdFlush: + return req.Flush().GetMutations()[0].GetKey(), nil + case tikvrpc.CmdBufferBatchGet: + return req.BufferBatchGet().GetKeys()[0], nil default: return nil, errors.New("unknown request, check the new type RPC request here") } diff --git a/pkg/store/mockstore/unistore/tikv/mvcc.go b/pkg/store/mockstore/unistore/tikv/mvcc.go index f4bab77bb1215..42e74bb298be0 100644 --- a/pkg/store/mockstore/unistore/tikv/mvcc.go +++ b/pkg/store/mockstore/unistore/tikv/mvcc.go @@ -1028,8 +1028,10 @@ func (store *MVCCStore) Flush(reqCtx *requestCtx, req *kvrpcpb.FlushRequest) err } dummyPrewriteReq := &kvrpcpb.PrewriteRequest{ - PrimaryLock: req.PrimaryKey, - StartVersion: startTS, + PrimaryLock: req.PrimaryKey, + StartVersion: startTS, + AssertionLevel: req.AssertionLevel, + LockTtl: req.LockTtl, } batch := store.dbWriter.NewWriteBatch(startTS, 0, reqCtx.rpcCtx) for i, m := range mutations { diff --git a/pkg/table/tables/tables_test.go b/pkg/table/tables/tables_test.go index 2deea74fc46c4..eef76d3c8531a 100644 --- a/pkg/table/tables/tables_test.go +++ b/pkg/table/tables/tables_test.go @@ -337,6 +337,7 @@ func TestUnsignedPK(t *testing.T) { func TestIterRecords(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") _, err := tk.Session().Execute(context.Background(), "DROP TABLE IF EXISTS test.tIter") require.NoError(t, err) _, err = tk.Session().Execute(context.Background(), "CREATE TABLE test.tIter (a int primary key, b int)") diff --git a/pkg/testkit/testkit.go b/pkg/testkit/testkit.go index 55208dab738b5..5b87c98e88ed4 100644 --- a/pkg/testkit/testkit.go +++ b/pkg/testkit/testkit.go @@ -87,6 +87,11 @@ func NewTestKit(t testing.TB, store kv.Storage) *TestKit { tk.session.SetSessionManager(sm) } + tk.MustExec("set session tidb_dml_type = bulk") + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(1)`)) + return tk } diff --git a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go index b5c7a60936dc1..3a436e579c4ae 100644 --- a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go +++ b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go @@ -36,6 +36,7 @@ func TestVariable(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk.MustExec("set session tidb_dml_type = standard") require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, false) tk.MustExec("set session tidb_dml_type = bulk") require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, true) @@ -82,6 +83,7 @@ func TestPipelinedDMLPositive(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int)") @@ -149,6 +151,7 @@ func TestPipelinedDMLNegative(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int primary key, b int)") @@ -525,6 +528,8 @@ func TestPipelinedDMLCommitFailed(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk1 := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = standard") + tk1.MustExec("set session tidb_dml_type = standard") tk.MustExec("use test") tk1.MustExec("use test") prepareData(tk) @@ -580,6 +585,7 @@ func TestPipelinedDMLInsertMemoryTest(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk.MustExec("set session tidb_dml_type = standard") tk.MustExec("drop table if exists t1, _t1") tk.MustExec("create table t1 (a int, b int, c varchar(128), unique index idx(b))") tk.MustExec("create table _t1 like t1") @@ -638,6 +644,7 @@ func TestPipelinedDMLDisableRetry(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") tk2.MustExec("use test") + tk2.MustExec("set session tidb_dml_type = standard") tk1.MustExec("drop table if exists t1") tk1.MustExec("create table t1(a int primary key, b int)") tk1.MustExec("insert into t1 values(1, 1)")