Skip to content

Commit

Permalink
commit-message: update the schema tracker core code about pingcap#1895
Browse files Browse the repository at this point in the history
  • Loading branch information
WizardXiao committed Sep 23, 2021
1 parent 4f90dc6 commit 8f7a8cf
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 33 deletions.
197 changes: 194 additions & 3 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/conn"
Expand All @@ -55,9 +56,19 @@ var (

// Tracker is used to track schema locally.
type Tracker struct {
store kv.Storage
dom *domain.Domain
se session.Session
store kv.Storage
dom *domain.Domain
se session.Session
toIndexes map[string]map[string]*ToIndexes
}

// ToIndexes is downstream pk/uk info.
type ToIndexes struct {
schemaName string
tableName string
pks []string // include multiple primary key
uks []string // uk/uks
// uksIsNull []bool // uk/uks is null?
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -321,3 +332,183 @@ func (tr *Tracker) CreateTableIfNotExists(db, table string, ti *model.TableInfo)
func (tr *Tracker) GetSystemVar(name string) (string, bool) {
return tr.se.GetSessionVars().GetSystemVar(name)
}

// GetToIndexInfo gets downstream PK Index.
// note. this function will init toIndexes.
func (tr *Tracker) GetToIndexInfo(db, table string, originTi *model.TableInfo, tctx *tcontext.Context, task string, tidbConn *conn.BaseConn) (*model.IndexInfo, error) {
if tr.toIndexes == nil {
tr.toIndexes = make(map[string]map[string]*ToIndexes)
}
if dbindexes := tr.toIndexes[db]; dbindexes == nil {
dbindexes = make(map[string]*ToIndexes)
tr.toIndexes[db] = dbindexes
}
index := tr.toIndexes[db][table]
if index == nil {
log.L().Info(fmt.Sprintf("DownStream schema tracker init: %s.%s", db, table))
index = &ToIndexes{
schemaName: db,
tableName: table,
pks: make([]string, 0),
uks: make([]string, 0),
// uksIsNull: make([]bool, 0),
}
// tctx := tcontext.NewContext(ctx, log.With(zap.String("component", "schema-tracker"), zap.String("task", task)))
rows, err := tidbConn.QuerySQL(tctx, fmt.Sprintf("SHOW INDEX FROM %s FROM %s", table, db))
if err != nil {
return nil, err
}

cols, err := rows.Columns()
if err != nil {
return nil, err
}
// the column of show statement is too many, so make dynamic values for scan
values := make([][]byte, len(cols))
scans := make([]interface{}, len(cols))
for i := range values {
scans[i] = &values[i]
}

for rows.Next() {
if err3 := rows.Scan(scans...); err3 != nil {
return nil, err3
}

// Key_name -- 2, Column_name -- 4, Null -- 9
nonUnique := string(values[1]) // 0 is UK
keyName := string(values[2]) // pk is PRIMARY
columName := string(values[4])
// isNull := string(values[9]) // Null is YES

if strings.EqualFold(keyName, "PRIMARY") {
// handle multiple pk
index.pks = append(index.pks, columName)
log.L().Info(fmt.Sprintf("DownStream schema tracker %s.%s Find PK %s", db, table, columName))
} else if strings.EqualFold(nonUnique, "0") {
index.uks = append(index.uks, columName)
log.L().Info(fmt.Sprintf("DownStream schema tracker %s.%s Find UK %s ", db, table, columName))
}
}
// nolint:sqlclosecheck
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
tr.toIndexes[db][table] = index
}

// construct model.IndexInfo, PK > not null UK
if len(index.pks) != 0 {
// handle multiple pk
columns := make([]*model.IndexColumn, 0, len(index.pks))
for _, pk := range index.pks {
if orginColumn := model.FindColumnInfo(originTi.Columns, pk); orginColumn != nil {
column := &model.IndexColumn{
Name: model.NewCIStr(pk),
Offset: orginColumn.Offset,
Length: types.UnspecifiedLength,
}
columns = append(columns, column)
}
}
if len(columns) != 0 {
return &model.IndexInfo{
Table: model.NewCIStr(table),
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: columns,
}, nil
}
}
// else if len(index.uks) != 0 {
// for i := 0; i < len(index.uks); i++ {
// if !index.uksIsNull[i] {
// if originColumn := model.FindColumnInfo(originTi.Columns, index.uks[i]); originColumn != nil {
// return &model.IndexInfo{
// Table: model.NewCIStr(table),
// Unique: true,
// Primary: false,
// State: model.StatePublic,
// Tp: model.IndexTypeBtree,
// Columns: []*model.IndexColumn{{
// Name: model.NewCIStr(index.uks[i]),
// Offset: originColumn.Offset,
// Length: types.UnspecifiedLength,
// }},
// }, nil
// }
// }
// }
// }

return nil, nil
}

// GetAvailableUKToIndexInfo gets available downstream UK whose data is not null
// note. this function will not init toIndexes.
func (tr *Tracker) GetAvailableUKToIndexInfo(db, table string, originTi *model.TableInfo, data []interface{}) *model.IndexInfo {
if tr.toIndexes == nil || tr.toIndexes[db] == nil || tr.toIndexes[db][table] == nil {
return nil
}
index := tr.toIndexes[db][table]
for i := 0; i < len(index.uks); i++ {
if originColumn := model.FindColumnInfo(originTi.Columns, index.uks[i]); originColumn != nil {
if data[originColumn.Offset] != nil {
return &model.IndexInfo{
Table: model.NewCIStr(table),
Unique: true,
Primary: false,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: model.NewCIStr(index.uks[i]),
Offset: originColumn.Offset,
Length: types.UnspecifiedLength,
}},
}
}
}
}
return nil
}

// SetToIndexNotAvailable set toIndex available is false
// func (tr *Tracker) SetToIndexNotAvailable(db, table string) {

// if tr.toIndexes == nil || tr.toIndexes[db] == nil || tr.toIndexes[db][table] == nil || !tr.toIndexes[db][table].isAlive {
// return
// } else {
// tr.toIndexes[db][table].isAlive = false
// }
// }

// TrackToIndex remove schema or table in toIndex.
func (tr *Tracker) TrackToIndex(targetTables []*filter.Table) {
if tr.toIndexes == nil || targetTables == nil {
return
}

for i := 0; i < len(targetTables); i++ {
db := targetTables[i].Schema
table := targetTables[i].Name
if tr.toIndexes[db] == nil {
return
}
if table == "" {
delete(tr.toIndexes, db)
log.L().Info(fmt.Sprintf("Remove downStream schema tracker %s ", db))
} else {
if tr.toIndexes[db][table] == nil {
return
}
delete(tr.toIndexes[db], table)
log.L().Info(fmt.Sprintf("Remove downStream schema tracker %s.%s ", db, table))
}
}

}
69 changes: 43 additions & 26 deletions syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/expression"
"go.uber.org/zap"

tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)
Expand Down Expand Up @@ -100,23 +101,30 @@ RowLoop:
}

func (s *Syncer) genUpdateSQLs(
tctx *tcontext.Context,
param *genDMLParam,
oldValueFilters []expression.Expression,
newValueFilters []expression.Expression,
) ([]string, [][]string, [][]interface{}, error) {
var (
qualifiedName = dbutil.TableName(param.schema, param.table)
data = param.data
originalData = param.originalData
columns = param.columns
ti = param.originalTableInfo
defaultIndexColumns = findFitIndex(ti)
replaceSQL string // `REPLACE INTO` SQL
sqls = make([]string, 0, len(data)/2)
keys = make([][]string, 0, len(data)/2)
values = make([][]interface{}, 0, len(data)/2)
qualifiedName = dbutil.TableName(param.schema, param.table)
data = param.data
originalData = param.originalData
columns = param.columns
ti = param.originalTableInfo
// defaultIndexColumns = findFitIndex(ti)
replaceSQL string // `REPLACE INTO` SQL
sqls = make([]string, 0, len(data)/2)
keys = make([][]string, 0, len(data)/2)
values = make([][]interface{}, 0, len(data)/2)
)

// if downstream pk exits, then use downstream pk
defaultIndexColumns, err := s.schemaTracker.GetToIndexInfo(param.schema, param.table, ti, tctx, s.cfg.Name, s.ddlDBConn.BaseConn)
if err != nil {
return nil, nil, nil, err
}

if param.safeMode {
replaceSQL = genInsertReplace("REPLACE INTO", qualifiedName, columns)
}
Expand Down Expand Up @@ -167,7 +175,8 @@ RowLoop:
}

if defaultIndexColumns == nil {
defaultIndexColumns = getAvailableIndexColumn(ti, oriOldValues)
// defaultIndexColumns = getAvailableIndexColumn(ti, oriOldValues)
defaultIndexColumns = s.schemaTracker.GetAvailableUKToIndexInfo(param.schema, param.table, ti, oriOldValues)
}

ks := genMultipleKeys(ti, oriOldValues, qualifiedName)
Expand Down Expand Up @@ -218,17 +227,23 @@ RowLoop:
return sqls, keys, values, nil
}

func (s *Syncer) genDeleteSQLs(param *genDMLParam, filterExprs []expression.Expression) ([]string, [][]string, [][]interface{}, error) {
func (s *Syncer) genDeleteSQLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]string, [][]string, [][]interface{}, error) {
var (
qualifiedName = dbutil.TableName(param.schema, param.table)
dataSeq = param.originalData
ti = param.originalTableInfo
defaultIndexColumns = findFitIndex(ti)
sqls = make([]string, 0, len(dataSeq))
keys = make([][]string, 0, len(dataSeq))
values = make([][]interface{}, 0, len(dataSeq))
qualifiedName = dbutil.TableName(param.schema, param.table)
dataSeq = param.originalData
ti = param.originalTableInfo
// defaultIndexColumns = findFitIndex(ti)
sqls = make([]string, 0, len(dataSeq))
keys = make([][]string, 0, len(dataSeq))
values = make([][]interface{}, 0, len(dataSeq))
)

// if downstream pk exits, then use downstream pk
defaultIndexColumns, err := s.schemaTracker.GetToIndexInfo(param.schema, param.table, ti, tctx, s.cfg.Name, s.ddlDBConn.BaseConn)
if err != nil {
return nil, nil, nil, err
}

RowLoop:
for _, data := range dataSeq {
if len(data) != len(ti.Columns) {
Expand All @@ -249,8 +264,10 @@ RowLoop:
}

if defaultIndexColumns == nil {
defaultIndexColumns = getAvailableIndexColumn(ti, value)
// defaultIndexColumns = getAvailableIndexColumn(ti, value)
defaultIndexColumns = s.schemaTracker.GetAvailableUKToIndexInfo(param.schema, param.table, ti, value)
}

ks := genMultipleKeys(ti, value, qualifiedName)

sql, value := genDeleteSQL(qualifiedName, value, ti.Columns, defaultIndexColumns)
Expand Down Expand Up @@ -498,13 +515,13 @@ func findFitIndex(ti *model.TableInfo) *model.IndexInfo {
return getSpecifiedIndexColumn(ti, fn)
}

func getAvailableIndexColumn(ti *model.TableInfo, data []interface{}) *model.IndexInfo {
fn := func(i int) bool {
return data[i] == nil
}
// func getAvailableIndexColumn(ti *model.TableInfo, data []interface{}) *model.IndexInfo {
// fn := func(i int) bool {
// return data[i] == nil
// }

return getSpecifiedIndexColumn(ti, fn)
}
// return getSpecifiedIndexColumn(ti, fn)
// }

func getSpecifiedIndexColumn(ti *model.TableInfo, fn func(i int) bool) *model.IndexInfo {
for _, indexCols := range ti.Indices {
Expand Down
Loading

0 comments on commit 8f7a8cf

Please sign in to comment.