Skip to content

Commit

Permalink
Merge branch 'master' into min-paging-size
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jul 13, 2022
2 parents 9363c5d + 2f934d6 commit caff4d7
Show file tree
Hide file tree
Showing 20 changed files with 186 additions and 326 deletions.
1 change: 0 additions & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ build --java_language_version=17
build --java_runtime_version=17
build --tool_java_language_version=17
build --tool_java_runtime_version=17
build --experimental_remote_cache_compression

run --color=yes
build:release --workspace_status_command=./build/print-workspace-status.sh --stamp
Expand Down
10 changes: 3 additions & 7 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (p *printByTable) AddTask(task TaskStatus) {
info := fmt.Sprintf("%s; gap=%s", pTime, gapColor.Sprint(gap))
return info
}
cp := task.GetMinStoreCheckpoint()
table.Add("checkpoint[global]", formatTS(cp.TS))
p.addCheckpoints(&task, table, formatTS)
for store, e := range task.LastErrors {
table.Add(fmt.Sprintf("error[store=%d]", store), e.ErrorCode)
Expand All @@ -147,21 +149,15 @@ func (p *printByTable) AddTask(task TaskStatus) {

func (p *printByTable) addCheckpoints(task *TaskStatus, table *glue.Table, formatTS func(uint64) string) {
cp := task.GetMinStoreCheckpoint()
items := make([][2]string, 0, len(task.Checkpoints))
if cp.Type() != CheckpointTypeGlobal {
for _, cp := range task.Checkpoints {
switch cp.Type() {
case CheckpointTypeStore:
items = append(items, [2]string{fmt.Sprintf("checkpoint[store=%d]", cp.ID), formatTS(cp.TS)})
table.Add(fmt.Sprintf("checkpoint[store=%d]", cp.ID), formatTS(cp.TS))
}
}
} else {
items = append(items, [2]string{"checkpoint[central-global]", formatTS(cp.TS)})
}

for _, item := range items {
table.Add(item[0], item[1])
}
}

func (p *printByTable) PrintTasks() {
Expand Down
143 changes: 0 additions & 143 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -77,60 +76,6 @@ func createMockStore(t *testing.T) kv.Storage {
return store
}

func testNewContext(d *ddl) sessionctx.Context {
ctx := mock.NewContext()
ctx.Store = d.store
return ctx
}

func getSchemaVer(t *testing.T, ctx sessionctx.Context) int64 {
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err := ctx.Txn(true)
require.NoError(t, err)
m := meta.NewMeta(txn)
ver, err := m.GetSchemaVersion()
require.NoError(t, err)
return ver
}

type historyJobArgs struct {
ver int64
db *model.DBInfo
tbl *model.TableInfo
tblIDs map[int64]struct{}
}

func checkEqualTable(t *testing.T, t1, t2 *model.TableInfo) {
require.Equal(t, t1.ID, t2.ID)
require.Equal(t, t1.Name, t2.Name)
require.Equal(t, t1.Charset, t2.Charset)
require.Equal(t, t1.Collate, t2.Collate)
require.Equal(t, t1.PKIsHandle, t2.PKIsHandle)
require.Equal(t, t1.Comment, t2.Comment)
require.Equal(t, t1.AutoIncID, t2.AutoIncID)
}

func checkHistoryJobArgs(t *testing.T, ctx sessionctx.Context, id int64, args *historyJobArgs) {
historyJob, err := GetHistoryJobByID(ctx, id)
require.NoError(t, err)
require.Greater(t, historyJob.BinlogInfo.FinishedTS, uint64(0))

if args.tbl != nil {
require.Equal(t, historyJob.BinlogInfo.SchemaVersion, args.ver)
checkEqualTable(t, historyJob.BinlogInfo.TableInfo, args.tbl)
return
}

// for handling schema job
require.Equal(t, historyJob.BinlogInfo.SchemaVersion, args.ver)
require.Equal(t, historyJob.BinlogInfo.DBInfo, args.db)
// only for creating schema job
if args.db != nil && len(args.tblIDs) == 0 {
return
}
}

func TestGetIntervalFromPolicy(t *testing.T) {
policy := []time.Duration{
1 * time.Second,
Expand Down Expand Up @@ -413,94 +358,6 @@ func TestNotifyDDLJob(t *testing.T) {
}
}

func testSchemaInfo(d *ddl, name string) (*model.DBInfo, error) {
dbInfo := &model.DBInfo{
Name: model.NewCIStr(name),
}
genIDs, err := d.genGlobalIDs(1)
if err != nil {
return nil, err
}
dbInfo.ID = genIDs[0]
return dbInfo, nil
}

func testCreateSchema(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) *model.Job {
job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{dbInfo},
}
ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJob(ctx, job))

v := getSchemaVer(t, ctx)
dbInfo.State = model.StatePublic
checkHistoryJobArgs(t, ctx, job.ID, &historyJobArgs{ver: v, db: dbInfo})
dbInfo.State = model.StateNone
return job
}

func buildDropSchemaJob(dbInfo *model.DBInfo) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}
}

func testDropSchema(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) (*model.Job, int64) {
job := buildDropSchemaJob(dbInfo)
ctx.SetValue(sessionctx.QueryString, "skip")
err := d.DoDDLJob(ctx, job)
require.NoError(t, err)
ver := getSchemaVer(t, ctx)
return job, ver
}

func isDDLJobDone(test *testing.T, t *meta.Meta) bool {
job, err := t.GetDDLJobByIdx(0)
require.NoError(test, err)
if job == nil {
return true
}

time.Sleep(testLease)
return false
}

func testCheckSchemaState(test *testing.T, d *ddl, dbInfo *model.DBInfo, state model.SchemaState) {
isDropped := true

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
for {
err := kv.RunInNewTxn(ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
info, err := t.GetDatabase(dbInfo.ID)
require.NoError(test, err)

if state == model.StateNone {
isDropped = isDDLJobDone(test, t)
if !isDropped {
return nil
}
require.Nil(test, info)
return nil
}

require.Equal(test, info.Name, dbInfo.Name)
require.Equal(test, info.State, state)
return nil
})
require.NoError(test, err)

if isDropped {
break
}
}
}

func TestError(t *testing.T) {
kvErrs := []*terror.Error{
dbterror.ErrDDLJobNotFound,
Expand Down
35 changes: 35 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,41 @@ func TestMultiSchemaChangeModifyColumnsCancelled(t *testing.T) {
Check(testkit.Rows("int"))
}

func TestMultiSchemaChangeAdminShowDDLJobs(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
originHook := dom.DDL().GetHook()
hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
assert.Equal(t, model.ActionMultiSchemaChange, job.Type)
if job.MultiSchemaInfo.SubJobs[0].SchemaState == model.StateDeleteOnly {
newTk := testkit.NewTestKit(t, store)
rows := newTk.MustQuery("admin show ddl jobs 1").Rows()
// 1 history job and 1 running job with 2 subjobs
assert.Equal(t, len(rows), 4)
assert.Equal(t, rows[1][1], "test")
assert.Equal(t, rows[1][2], "t")
assert.Equal(t, rows[1][3], "add index /* subjob */")
assert.Equal(t, rows[1][4], "delete only")
assert.Equal(t, rows[1][len(rows[1])-1], "running")

assert.Equal(t, rows[2][3], "add index /* subjob */")
assert.Equal(t, rows[2][4], "none")
assert.Equal(t, rows[2][len(rows[2])-1], "queueing")
}
}

tk.MustExec("create table t (a int, b int, c int)")
tk.MustExec("insert into t values (1, 2, 3)")

dom.DDL().SetHook(hook)
tk.MustExec("alter table t add index t(a), add index t1(b)")
dom.DDL().SetHook(originHook)
}

func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
Expand Down
Loading

0 comments on commit caff4d7

Please sign in to comment.