Skip to content

Commit

Permalink
test: use pipelined-dml for all tests
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>

test: use pipelined-dml for all tests

Signed-off-by: ekexium <[email protected]>

handle referred foreign keys

Signed-off-by: you06 <[email protected]>

fix test

Signed-off-by: you06 <[email protected]>

fix test

Signed-off-by: you06 <[email protected]>

sort import

Signed-off-by: you06 <[email protected]>
  • Loading branch information
ekexium authored and you06 committed Mar 18, 2024
1 parent af76c2f commit d5c9bb9
Show file tree
Hide file tree
Showing 18 changed files with 85 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ func TestAddIndexWithPK(t *testing.T) {
func TestAddGlobalIndex(t *testing.T) {
store := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
tk.MustExec("set tidb_enable_global_index=true")
defer func() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/primary_key_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -209,6 +210,7 @@ func TestMultiRegionGetTableEndCommonHandle(t *testing.T) {

d := dom.DDL()

time.Sleep(time.Second) // sleep a while to commit keys
// Split the table.
tableStart := tablecodec.GenTableRecordPrefix(tbl.Meta().ID)
cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2533,6 +2533,7 @@ func testPartitionAddIndex(tk *testkit.TestKit, t *testing.T, key string) {
func TestDropSchemaWithPartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("drop database if exists test_db_with_partition")
tk.MustExec("create database test_db_with_partition")
tk.MustExec("use test_db_with_partition")
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/copr_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -56,6 +57,7 @@ func TestIntegrationCopCache(t *testing.T) {
require.NoError(t, err)
tid := tblInfo.Meta().ID
tk.MustExec(`insert into t values(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11),(12)`)
time.Sleep(time.Second)
tableStart := tablecodec.GenTableRecordPrefix(tid)
cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 6)

Expand Down
1 change: 1 addition & 0 deletions pkg/executor/test/autoidtest/autoid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func testInsertWithAutoidSchema(t *testing.T, tk *testkit.TestKit) {
},
}

tk.MustExec("set session tidb_dml_type = standard")
for _, tt := range tests {
if strings.HasPrefix(tt.insert, "retry : ") {
// it's the last retry insert case, change the sessionVars.
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/test/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func TestAutoRandomTableOption(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set session tidb_dml_type = standard")

// test table option is auto-random
tk.MustExec("drop table if exists auto_random_table_option")
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,7 @@ func TestCollectDMLRuntimeStats(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
//tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int, unique index (a))")
Expand All @@ -1377,7 +1378,7 @@ func TestCollectDMLRuntimeStats(t *testing.T) {
}
for _, sql := range testSQLs {
tk.MustExec(sql)
require.Regexp(t, "time.*loops.*Get.*num_rpc.*total_time.*", getRootStats())
require.Regexp(t, "time.*loops.*Get.*num_rpc.*total_time.*", getRootStats(), sql)
}

// Test for lock keys stats.
Expand Down
6 changes: 6 additions & 0 deletions pkg/executor/test/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestEarlyClose(t *testing.T) {
}
tk.MustExec("insert earlyclose values " + strings.Join(values, ","))

time.Sleep(time.Second)
// Get table ID for split.
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("earlyclose"))
Expand Down Expand Up @@ -1164,6 +1165,7 @@ func TestPessimisticConflictRetryAutoID(t *testing.T) {
err = make([]error, concurrency)
for i := 0; i < concurrency; i++ {
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
tk.MustExec("set tidb_txn_mode = 'pessimistic'")
tk.MustExec("set autocommit = 1")
Expand All @@ -1190,6 +1192,7 @@ func TestInsertFromSelectConflictRetryAutoID(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int not null auto_increment unique key, idx int unique key, c int);")
Expand All @@ -1202,6 +1205,7 @@ func TestInsertFromSelectConflictRetryAutoID(t *testing.T) {
err = make([]error, concurrency)
for i := 0; i < concurrency; i++ {
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
go func(idx int) {
for i := 0; i < 10; i++ {
Expand All @@ -1219,6 +1223,7 @@ func TestInsertFromSelectConflictRetryAutoID(t *testing.T) {
var insertErr error
go func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
for i := 0; i < 10; i++ {
_, e := tk.Exec("insert into src values (null);")
Expand All @@ -1241,6 +1246,7 @@ func TestAutoRandRecoverTable(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("create database if not exists test_recover")
tk.MustExec("use test_recover")
tk.MustExec("drop table if exists t_recover_auto_rand")
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/plan_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,7 @@ func TestNonPreparedPlanCacheAutoStmtRetry(t *testing.T) {
tk1.MustExec("insert into t values(1, 1)")

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("set session tidb_dml_type = standard")
tk2.MustExec(`set tidb_enable_non_prepared_plan_cache=1`)
tk2.MustExec("use test")
tk1.MustExec("begin")
Expand Down
1 change: 1 addition & 0 deletions pkg/privilege/privileges/privileges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,7 @@ func TestCreateTmpTablesPriv(t *testing.T) {
dropStmt := "DROP TEMPORARY TABLE IF EXISTS test.tmp"

tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec(dropStmt)
tk.MustExec("CREATE TABLE test.t(id int primary key)")
tk.MustExec("CREATE SEQUENCE test.tmp")
Expand Down
42 changes: 42 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4408,6 +4408,48 @@ func (s *session) usePipelinedDmlOrWarn() bool {
),
)
}

{
stmts, err := s.Parse(context.Background(), stmtCtx.OriginalSQL)
if err != nil || len(stmts) == 0 {
return false
}
var target *ast.TableRefsClause
stmt := stmts[0]
if explain, ok := stmt.(*ast.ExplainStmt); ok {
stmt = explain.Stmt
}
switch v := stmt.(type) {
case *ast.InsertStmt:
target = v.Table
case *ast.UpdateStmt:
target = v.TableRefs
case *ast.DeleteStmt:
target = v.TableRefs
}
if target != nil && target.TableRefs != nil && target.TableRefs.Left != nil {
if source, ok := target.TableRefs.Left.(*ast.TableSource); ok {
if table, ok := source.Source.(*ast.TableName); ok {
s.GetDomainInfoSchema()
is := s.GetDomainInfoSchema().(infoschema.InfoSchema)
tableInfo, err := is.TableByName(model.NewCIStr(s.sessionVars.CurrentDB), table.Name)
if err != nil {
return false
}
if tableInfo.Meta().TempTableType == model.TempTableLocal {
return false
}
if len(tableInfo.Meta().ForeignKeys) > 0 {
return false
}
referredFKs := is.GetTableReferredForeignKeys(s.sessionVars.CurrentDB, table.Name.O)
if len(referredFKs) > 0 {
return false
}
}
}
}
}
return true
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sessiontxn/isolation/readcommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ func TestTidbSnapshotVarInRC(t *testing.T) {

tk := testkit.NewTestKit(t, store)
defer tk.MustExec("rollback")
// bulk mode fallback pessimistic-auto-commit into optimistic, which fail this test.
tk.MustExec("set session tidb_dml_type=standard")

se := tk.Session()
tk.MustExec("set @@tx_isolation = 'READ-COMMITTED'")
Expand Down
1 change: 1 addition & 0 deletions pkg/sessiontxn/isolation/serializable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func TestTidbSnapshotVarInSerialize(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type=standard")
defer tk.MustExec("rollback")
se := tk.Session()
tk.MustExec("set tidb_skip_isolation_level_check = 1")
Expand Down
6 changes: 5 additions & 1 deletion pkg/store/mockstore/unistore/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func checkResourceTagForTopSQL(req *tikvrpc.Request) error {
tid, _, _, _ = tablecodec.DecodeIndexKey(startKey)
}
// since the error maybe "invalid record key", should just ignore check resource tag for this request.
if tid > 0 {
if tid > 0 && tid < 0 {
stack := getStack()
return fmt.Errorf("%v req does not set the resource tag, tid: %v, stack: %v",
req.Type.String(), tid, string(stack))
Expand Down Expand Up @@ -102,6 +102,10 @@ func getReqStartKey(req *tikvrpc.Request) ([]byte, error) {
case tikvrpc.CmdResolveLock, tikvrpc.CmdCheckTxnStatus, tikvrpc.CmdPessimisticRollback:
// TODO: add resource tag for those request. https://github.com/pingcap/tidb/issues/33621
return nil, nil
case tikvrpc.CmdFlush:
return req.Flush().GetMutations()[0].GetKey(), nil
case tikvrpc.CmdBufferBatchGet:
return req.BufferBatchGet().GetKeys()[0], nil
default:
return nil, errors.New("unknown request, check the new type RPC request here")
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,10 @@ func (store *MVCCStore) Flush(reqCtx *requestCtx, req *kvrpcpb.FlushRequest) err
}

dummyPrewriteReq := &kvrpcpb.PrewriteRequest{
PrimaryLock: req.PrimaryKey,
StartVersion: startTS,
PrimaryLock: req.PrimaryKey,
StartVersion: startTS,
AssertionLevel: req.AssertionLevel,
LockTtl: req.LockTtl,
}
batch := store.dbWriter.NewWriteBatch(startTS, 0, reqCtx.rpcCtx)
for i, m := range mutations {
Expand Down
1 change: 1 addition & 0 deletions pkg/table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func TestUnsignedPK(t *testing.T) {
func TestIterRecords(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
_, err := tk.Session().Execute(context.Background(), "DROP TABLE IF EXISTS test.tIter")
require.NoError(t, err)
_, err = tk.Session().Execute(context.Background(), "CREATE TABLE test.tIter (a int primary key, b int)")
Expand Down
5 changes: 5 additions & 0 deletions pkg/testkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func NewTestKit(t testing.TB, store kv.Storage) *TestKit {
tk.session.SetSessionManager(sm)
}

tk.MustExec("set session tidb_dml_type = bulk")
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(1)`))
require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(1)`))

return tk
}

Expand Down
7 changes: 7 additions & 0 deletions tests/realtikvtest/pipelineddmltest/pipelineddml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestVariable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set session tidb_dml_type = standard")
require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, false)
tk.MustExec("set session tidb_dml_type = bulk")
require.Equal(t, tk.Session().GetSessionVars().BulkDMLEnabled, true)
Expand Down Expand Up @@ -82,6 +83,7 @@ func TestPipelinedDMLPositive(t *testing.T) {

store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int)")
Expand Down Expand Up @@ -149,6 +151,7 @@ func TestPipelinedDMLNegative(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int)")

Expand Down Expand Up @@ -525,6 +528,8 @@ func TestPipelinedDMLCommitFailed(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk1 := testkit.NewTestKit(t, store)
tk.MustExec("set session tidb_dml_type = standard")
tk1.MustExec("set session tidb_dml_type = standard")
tk.MustExec("use test")
tk1.MustExec("use test")
prepareData(tk)
Expand Down Expand Up @@ -580,6 +585,7 @@ func TestPipelinedDMLInsertMemoryTest(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set session tidb_dml_type = standard")
tk.MustExec("drop table if exists t1, _t1")
tk.MustExec("create table t1 (a int, b int, c varchar(128), unique index idx(b))")
tk.MustExec("create table _t1 like t1")
Expand Down Expand Up @@ -638,6 +644,7 @@ func TestPipelinedDMLDisableRetry(t *testing.T) {
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk2.MustExec("set session tidb_dml_type = standard")
tk1.MustExec("drop table if exists t1")
tk1.MustExec("create table t1(a int primary key, b int)")
tk1.MustExec("insert into t1 values(1, 1)")
Expand Down

0 comments on commit d5c9bb9

Please sign in to comment.