Skip to content

Commit

Permalink
Merge branch 'master' into fix_data_race_20230105
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 6, 2023
2 parents 23c9539 + f9f7268 commit eef10eb
Show file tree
Hide file tree
Showing 62 changed files with 10,257 additions and 11,897 deletions.
3 changes: 3 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"//parser",
"//parser/ast",
"//parser/charset",
"//parser/duration",
"//parser/format",
"//parser/model",
"//parser/mysql",
Expand Down Expand Up @@ -179,6 +180,7 @@ go_test(
"main_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
"mv_index_test.go",
"options_test.go",
"partition_test.go",
"placement_policy_ddl_test.go",
Expand Down Expand Up @@ -221,6 +223,7 @@ go_test(
"//parser/ast",
"//parser/auth",
"//parser/charset",
"//parser/duration",
"//parser/model",
"//parser/mysql",
"//parser/terror",
Expand Down
75 changes: 0 additions & 75 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,78 +1029,3 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {

tk.MustExec("drop table if exists t")
}

func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0")
tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON")
defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF")

d := dom.DDL()
testInsertOnModifyColumn := func(sql string, startColState, commitColState model.SchemaState, retStrs []string, retErr error) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))")
tk.MustExec("insert into t1 values (20, 20, 20);")

var checkErr error
tk1 := testkit.NewTestKit(t, store)
defer func() {
if tk1.Session() != nil {
tk1.Session().Close()
}
}()
hook := &ddl.TestDDLCallback{Do: dom}
times := 0
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != startColState {
return
}

tk1.MustExec("use test")
tk1.MustExec("begin pessimistic;")
tk1.MustExec("insert into t1 values(101, 102, 103)")
}
onJobUpdatedExportedFunc := func(job *model.Job) {
if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != commitColState {
return
}
if times == 0 {
_, checkErr = tk1.Exec("commit;")
}
times++
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(hook)

tk.MustExec(sql)
if retErr == nil {
require.NoError(t, checkErr)
} else {
require.Error(t, checkErr)
require.Contains(t, checkErr.Error(), retErr.Error())
}
tk.MustQuery("select * from t1").Check(testkit.Rows(retStrs...))
tk.MustExec("admin check table t1")
}

// Testing it needs reorg data.
ddlStatement := "alter table t1 change column c2 cc smallint;"
testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged)
testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged)
testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged)
testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged)
testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged)
testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged)

// Testing it needs not reorg data. This case only have two states: none, public.
ddlStatement = "alter table t1 change column c2 cc bigint;"
testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, nil)
testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, nil)
testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20", "101 102 103"}, nil)
testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, nil)
}
247 changes: 0 additions & 247 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/parser/terror"
parsertypes "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/store/mockstore"
Expand All @@ -55,7 +54,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -981,202 +979,6 @@ func TestDDLJobErrorCount(t *testing.T) {
require.True(t, kv.ErrEntryTooLarge.Equal(historyJob.Error))
}

func TestCommitTxnWithIndexChange(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
tk.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)")
tk.MustExec("alter table t1 add index k2(c2)")
tk.MustExec("alter table t1 drop index k2")
tk.MustExec("alter table t1 add index k2(c2)")
tk.MustExec("alter table t1 drop index k2")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

// tkSQLs are the sql statements for the pessimistic transaction.
// tk2DDL are the ddl statements executed before the pessimistic transaction.
// idxDDL is the DDL statement executed between pessimistic transaction begin and commit.
// failCommit means the pessimistic transaction commit should fail not.
type caseUnit struct {
tkSQLs []string
tk2DDL []string
idxDDL string
checkSQLs []string
rowsExps [][]string
failCommit bool
stateEnd model.SchemaState
}

cases := []caseUnit{
// Test secondary index
{[]string{"insert into t1 values(3, 30, 300)",
"insert into t2 values(11, 11, 11)"},
[]string{"alter table t1 add index k2(c2)",
"alter table t1 drop index k2",
"alter table t1 add index kk2(c2, c1)",
"alter table t1 add index k2(c2)",
"alter table t1 drop index k2"},
"alter table t1 add index k2(c2)",
[]string{"select c3, c2 from t1 use index(k2) where c2 = 20",
"select c3, c2 from t1 use index(k2) where c2 = 10",
"select * from t1",
"select * from t2 where c1 = 11"},
[][]string{{"200 20"},
{"100 10"},
{"1 10 100", "2 20 200", "3 30 300"},
{"11 11 11"}},
false,
model.StateNone},
// Test secondary index
{[]string{"insert into t2 values(5, 50, 500)",
"insert into t2 values(11, 11, 11)",
"delete from t2 where c2 = 11",
"update t2 set c2 = 110 where c1 = 11"},
// "update t2 set c1 = 10 where c3 = 100"},
[]string{"alter table t1 add index k2(c2)",
"alter table t1 drop index k2",
"alter table t1 add index kk2(c2, c1)",
"alter table t1 add index k2(c2)",
"alter table t1 drop index k2"},
"alter table t1 add index k2(c2)",
[]string{"select c3, c2 from t1 use index(k2) where c2 = 20",
"select c3, c2 from t1 use index(k2) where c2 = 10",
"select * from t1",
"select * from t2 where c1 = 11",
"select * from t2 where c3 = 100"},
[][]string{{"200 20"},
{"100 10"},
{"1 10 100", "2 20 200"},
{},
{"1 10 100"}},
false,
model.StateNone},
// Test unique index
{[]string{"insert into t1 values(3, 30, 300)",
"insert into t1 values(4, 40, 400)",
"insert into t2 values(11, 11, 11)",
"insert into t2 values(12, 12, 11)"},
[]string{"alter table t1 add unique index uk3(c3)",
"alter table t1 drop index uk3",
"alter table t2 add unique index ukc1c3(c1, c3)",
"alter table t2 add unique index ukc3(c3)",
"alter table t2 drop index ukc1c3",
"alter table t2 drop index ukc3",
"alter table t2 add index kc3(c3)"},
"alter table t1 add unique index uk3(c3)",
[]string{"select c3, c2 from t1 use index(uk3) where c3 = 200",
"select c3, c2 from t1 use index(uk3) where c3 = 300",
"select c3, c2 from t1 use index(uk3) where c3 = 400",
"select * from t1",
"select * from t2"},
[][]string{{"200 20"},
{"300 30"},
{"400 40"},
{"1 10 100", "2 20 200", "3 30 300", "4 40 400"},
{"1 10 100", "2 20 200", "11 11 11", "12 12 11"}},
false, model.StateNone},
// Test unique index fail to commit, this case needs the new index could be inserted
{[]string{"insert into t1 values(3, 30, 300)",
"insert into t1 values(4, 40, 300)",
"insert into t2 values(11, 11, 11)",
"insert into t2 values(12, 11, 12)"},
//[]string{"alter table t1 add unique index uk3(c3)", "alter table t1 drop index uk3"},
[]string{},
"alter table t1 add unique index uk3(c3)",
[]string{"select c3, c2 from t1 use index(uk3) where c3 = 200",
"select c3, c2 from t1 use index(uk3) where c3 = 300",
"select c3, c2 from t1 where c1 = 4",
"select * from t1",
"select * from t2"},
[][]string{{"200 20"},
{},
{},
{"1 10 100", "2 20 200"},
{"1 10 100", "2 20 200"}},
true,
model.StateWriteOnly},
}
tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200"))

// Test add index state change
do := dom.DDL()
startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly}
for _, startState := range startStates {
endStatMap := session.ConstOpAddIndex[startState]
var endStates []model.SchemaState
for st := range endStatMap {
endStates = append(endStates, st)
}
slices.Sort(endStates)
for _, endState := range endStates {
for _, curCase := range cases {
if endState < curCase.stateEnd {
break
}
tk2.MustExec("drop table if exists t1")
tk2.MustExec("drop table if exists t2")
tk2.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
tk2.MustExec("create table t2 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
tk2.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)")
tk2.MustExec("insert t2 values (1, 10, 100), (2, 20, 200)")
tk2.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200"))
tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200"))
tk.MustQuery("select * from t2;").Check(testkit.Rows("1 10 100", "2 20 200"))

for _, DDLSQL := range curCase.tk2DDL {
tk2.MustExec(DDLSQL)
}
hook := &ddl.TestDDLCallback{Do: dom}
prepared := false
committed := false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == startState {
if !prepared {
tk.MustExec("begin pessimistic")
for _, tkSQL := range curCase.tkSQLs {
tk.MustExec(tkSQL)
}
prepared = true
}
}
}
onJobUpdatedExportedFunc := func(job *model.Job) {
if job.SchemaState == endState {
if !committed {
if curCase.failCommit {
err := tk.ExecToErr("commit")
require.Error(t, err)
} else {
tk.MustExec("commit")
}
}
committed = true
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
originalCallback := do.GetHook()
do.SetHook(hook)
tk2.MustExec(curCase.idxDDL)
do.SetHook(originalCallback)
tk2.MustExec("admin check table t1")
for i, checkSQL := range curCase.checkSQLs {
if len(curCase.rowsExps[i]) > 0 {
tk2.MustQuery(checkSQL).Check(testkit.Rows(curCase.rowsExps[i]...))
} else {
tk2.MustQuery(checkSQL).Check(nil)
}
}
}
}
}
tk.MustExec("admin check table t1")
}

// TestAddIndexFailOnCaseWhenCanExit is used to close #19325.
func TestAddIndexFailOnCaseWhenCanExit(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCaseWhenParseFailure", `return(true)`))
Expand Down Expand Up @@ -1380,55 +1182,6 @@ func TestTxnSavepointWithDDL(t *testing.T) {
tk.MustExec("admin check table t1, t2")
}

func TestAmendTxnSavepointWithDDL(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk2.MustExec("use test;")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;")

prepareFn := func() {
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c1 int primary key, c2 int)")
tk.MustExec("create table t2 (c1 int primary key, c2 int)")
}

prepareFn()
tk.MustExec("truncate table t1")
tk.MustExec("begin pessimistic")
tk.MustExec("savepoint s1")
tk.MustExec("insert t1 values (1, 11)")
tk.MustExec("savepoint s2")
tk.MustExec("insert t2 values (1, 11)")
tk.MustExec("rollback to s2")
tk2.MustExec("alter table t1 add index idx2(c2)")
tk2.MustExec("alter table t2 add index idx2(c2)")
tk.MustExec("commit")
tk.MustQuery("select * from t1").Check(testkit.Rows("1 11"))
tk.MustQuery("select * from t2").Check(testkit.Rows())
tk.MustExec("admin check table t1, t2")

prepareFn()
tk.MustExec("truncate table t1")
tk.MustExec("begin pessimistic")
tk.MustExec("savepoint s1")
tk.MustExec("insert t1 values (1, 11)")
tk.MustExec("savepoint s2")
tk.MustExec("insert t2 values (1, 11)")
tk.MustExec("savepoint s3")
tk.MustExec("insert t2 values (2, 22)")
tk.MustExec("rollback to s3")
tk2.MustExec("alter table t1 add index idx2(c2)")
tk2.MustExec("alter table t2 add index idx2(c2)")
tk.MustExec("commit")
tk.MustQuery("select * from t1").Check(testkit.Rows("1 11"))
tk.MustQuery("select * from t2").Check(testkit.Rows("1 11"))
tk.MustExec("admin check table t1, t2")
}

func TestSnapshotVersion(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)

Expand Down
Loading

0 comments on commit eef10eb

Please sign in to comment.