Skip to content

Commit

Permalink
ddl: refactor adjustColumnInfoInAddColumn for multi-schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Mar 4, 2022
1 parent f26d659 commit 34aa3e4
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 32 deletions.
34 changes: 2 additions & 32 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,6 @@ import (
"go.uber.org/zap"
)

// adjustColumnInfoInAddColumn is used to set the correct position of column info when adding column.
// 1. The added column was append at the end of tblInfo.Columns, due to ddl state was not public then.
// It should be moved to the correct position when the ddl state to be changed to public.
// 2. The offset of column should also to be set to the right value.
func adjustColumnInfoInAddColumn(tblInfo *model.TableInfo, offset int) {
oldCols := tblInfo.Columns
newCols := make([]*model.ColumnInfo, 0, len(oldCols))
newCols = append(newCols, oldCols[:offset]...)
newCols = append(newCols, oldCols[len(oldCols)-1])
newCols = append(newCols, oldCols[offset:len(oldCols)-1]...)
// Adjust column offset.
offsetChanged := make(map[int]int, len(newCols)-offset-1)
for i := offset + 1; i < len(newCols); i++ {
offsetChanged[newCols[i].Offset] = i
newCols[i].Offset = i
}
newCols[offset].Offset = offset
// Update index column offset info.
// TODO: There may be some corner cases for index column offsets, we may check this later.
for _, idx := range tblInfo.Indices {
for _, col := range idx.Columns {
newOffset, ok := offsetChanged[col.Offset]
if ok {
col.Offset = newOffset
}
}
}
tblInfo.Columns = newCols
}

// adjustColumnInfoInDropColumn is used to set the correct position of column info when dropping column.
// 1. The offset of column should to be set to the last of the columns.
// 2. The dropped column is moved to the end of tblInfo.Columns, due to it was not public any more.
Expand Down Expand Up @@ -239,7 +209,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offset.
adjustColumnInfoInAddColumn(tblInfo, offset)
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
if err != nil {
Expand Down Expand Up @@ -402,7 +372,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
adjustColumnInfoInAddColumn(tblInfo, offsets[i])
tblInfo.MoveColumnInfo(len(tblInfo.Columns)-1, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
Expand Down
33 changes: 33 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,39 @@ func (t *TableInfo) IsLocked() bool {
return t.Lock != nil && len(t.Lock.Sessions) > 0
}

// MoveColumnInfo moves a column to another offset.
func (t *TableInfo) MoveColumnInfo(from, to int) {
if from == to {
return
}
updatedOffsets := make(map[int]int)
src := t.Columns[from]
if from < to {
for i := from; i < to; i++ {
t.Columns[i] = t.Columns[i+1]
t.Columns[i].Offset = i
updatedOffsets[i+1] = i
}
} else if from > to {
for i := from; i > to; i-- {
t.Columns[i] = t.Columns[i-1]
t.Columns[i].Offset = i
updatedOffsets[i-1] = i
}
}
t.Columns[to] = src
t.Columns[to].Offset = to
updatedOffsets[from] = to
for _, idx := range t.Indices {
for _, idxCol := range idx.Columns {
newOffset, ok := updatedOffsets[idxCol.Offset]
if ok {
idxCol.Offset = newOffset
}
}
}
}

// NewExtraHandleColInfo mocks a column info for extra handle column.
func NewExtraHandleColInfo() *ColumnInfo {
colInfo := &ColumnInfo{
Expand Down
79 changes: 79 additions & 0 deletions parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,85 @@ func TestT(t *testing.T) {
require.Equal(t, "aBC", abc.String())
}

func newColumnForTest(id int64, offset int) *ColumnInfo {
return &ColumnInfo{
ID: id,
Name: NewCIStr(fmt.Sprintf("c_%d", id)),
Offset: offset,
}
}

func newIndexForTest(id int64, cols ...*ColumnInfo) *IndexInfo {
idxCols := make([]*IndexColumn, 0, len(cols))
for _, c := range cols {
idxCols = append(idxCols, &IndexColumn{Offset: c.Offset, Name: c.Name})
}
return &IndexInfo{
ID: id,
Name: NewCIStr(fmt.Sprintf("i_%d", id)),
Columns: idxCols,
}
}

func checkOffsets(t *testing.T, tbl *TableInfo, ids ...int) {
require.Equal(t, len(ids), len(tbl.Columns))
for i := 0; i < len(ids); i++ {
expected := fmt.Sprintf("c_%d", ids[i])
require.Equal(t, expected, tbl.Columns[i].Name.L)
require.Equal(t, i, tbl.Columns[i].Offset)
}
for _, col := range tbl.Columns {
for _, idx := range tbl.Indices {
for _, idxCol := range idx.Columns {
if col.Name.L != idxCol.Name.L {
continue
}
// Columns with the same name should have a same offset.
require.Equal(t, col.Offset, idxCol.Offset)
}
}
}
}

func TestMoveColumnInfo(t *testing.T) {
c0 := newColumnForTest(0, 0)
c1 := newColumnForTest(1, 1)
c2 := newColumnForTest(2, 2)
c3 := newColumnForTest(3, 3)
c4 := newColumnForTest(4, 4)

i0 := newIndexForTest(0, c0, c1, c2, c3, c4)
i1 := newIndexForTest(1, c4, c2)
i2 := newIndexForTest(2, c0, c4)
i3 := newIndexForTest(3, c1, c2, c3)
i4 := newIndexForTest(4, c3, c2, c1)

tbl := &TableInfo{
ID: 1,
Name: NewCIStr("t"),
Columns: []*ColumnInfo{c0, c1, c2, c3, c4},
Indices: []*IndexInfo{i0, i1, i2, i3, i4},
}

// Original offsets: [0, 1, 2, 3, 4]
tbl.MoveColumnInfo(4, 0)
checkOffsets(t, tbl, 4, 0, 1, 2, 3)
tbl.MoveColumnInfo(2, 3)
checkOffsets(t, tbl, 4, 0, 2, 1, 3)
tbl.MoveColumnInfo(3, 2)
checkOffsets(t, tbl, 4, 0, 1, 2, 3)
tbl.MoveColumnInfo(0, 4)
checkOffsets(t, tbl, 0, 1, 2, 3, 4)
tbl.MoveColumnInfo(2, 2)
checkOffsets(t, tbl, 0, 1, 2, 3, 4)
tbl.MoveColumnInfo(0, 0)
checkOffsets(t, tbl, 0, 1, 2, 3, 4)
tbl.MoveColumnInfo(1, 4)
checkOffsets(t, tbl, 0, 2, 3, 4, 1)
tbl.MoveColumnInfo(3, 0)
checkOffsets(t, tbl, 4, 0, 2, 3, 1)
}

func TestModelBasic(t *testing.T) {
column := &ColumnInfo{
ID: 1,
Expand Down

0 comments on commit 34aa3e4

Please sign in to comment.