From 8f7a8cff42aa39ef0202ab94f1dd9e185be2d0d9 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Thu, 23 Sep 2021 17:37:41 +0800 Subject: [PATCH] commit-message: update the schema tracker core code about #1895 --- pkg/schema/tracker.go | 197 +++++++++++++++++++++++++++++++++++++++++- syncer/dml.go | 69 +++++++++------ syncer/syncer.go | 13 ++- syncer/syncer_test.go | 4 +- 4 files changed, 250 insertions(+), 33 deletions(-) diff --git a/pkg/schema/tracker.go b/pkg/schema/tracker.go index 63fa12b019..6617dd56c9 100644 --- a/pkg/schema/tracker.go +++ b/pkg/schema/tracker.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/types" "go.uber.org/zap" "github.com/pingcap/dm/pkg/conn" @@ -55,9 +56,19 @@ var ( // Tracker is used to track schema locally. type Tracker struct { - store kv.Storage - dom *domain.Domain - se session.Session + store kv.Storage + dom *domain.Domain + se session.Session + toIndexes map[string]map[string]*ToIndexes +} + +// ToIndexes is downstream pk/uk info. +type ToIndexes struct { + schemaName string + tableName string + pks []string // include multiple primary key + uks []string // uk/uks + // uksIsNull []bool // uk/uks is null? } // NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve @@ -321,3 +332,183 @@ func (tr *Tracker) CreateTableIfNotExists(db, table string, ti *model.TableInfo) func (tr *Tracker) GetSystemVar(name string) (string, bool) { return tr.se.GetSessionVars().GetSystemVar(name) } + +// GetToIndexInfo gets downstream PK Index. +// note. this function will init toIndexes. +func (tr *Tracker) GetToIndexInfo(db, table string, originTi *model.TableInfo, tctx *tcontext.Context, task string, tidbConn *conn.BaseConn) (*model.IndexInfo, error) { + if tr.toIndexes == nil { + tr.toIndexes = make(map[string]map[string]*ToIndexes) + } + if dbindexes := tr.toIndexes[db]; dbindexes == nil { + dbindexes = make(map[string]*ToIndexes) + tr.toIndexes[db] = dbindexes + } + index := tr.toIndexes[db][table] + if index == nil { + log.L().Info(fmt.Sprintf("DownStream schema tracker init: %s.%s", db, table)) + index = &ToIndexes{ + schemaName: db, + tableName: table, + pks: make([]string, 0), + uks: make([]string, 0), + // uksIsNull: make([]bool, 0), + } + // tctx := tcontext.NewContext(ctx, log.With(zap.String("component", "schema-tracker"), zap.String("task", task))) + rows, err := tidbConn.QuerySQL(tctx, fmt.Sprintf("SHOW INDEX FROM %s FROM %s", table, db)) + if err != nil { + return nil, err + } + + cols, err := rows.Columns() + if err != nil { + return nil, err + } + // the column of show statement is too many, so make dynamic values for scan + values := make([][]byte, len(cols)) + scans := make([]interface{}, len(cols)) + for i := range values { + scans[i] = &values[i] + } + + for rows.Next() { + if err3 := rows.Scan(scans...); err3 != nil { + return nil, err3 + } + + // Key_name -- 2, Column_name -- 4, Null -- 9 + nonUnique := string(values[1]) // 0 is UK + keyName := string(values[2]) // pk is PRIMARY + columName := string(values[4]) + // isNull := string(values[9]) // Null is YES + + if strings.EqualFold(keyName, "PRIMARY") { + // handle multiple pk + index.pks = append(index.pks, columName) + log.L().Info(fmt.Sprintf("DownStream schema tracker %s.%s Find PK %s", db, table, columName)) + } else if strings.EqualFold(nonUnique, "0") { + index.uks = append(index.uks, columName) + log.L().Info(fmt.Sprintf("DownStream schema tracker %s.%s Find UK %s ", db, table, columName)) + } + } + // nolint:sqlclosecheck + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + tr.toIndexes[db][table] = index + } + + // construct model.IndexInfo, PK > not null UK + if len(index.pks) != 0 { + // handle multiple pk + columns := make([]*model.IndexColumn, 0, len(index.pks)) + for _, pk := range index.pks { + if orginColumn := model.FindColumnInfo(originTi.Columns, pk); orginColumn != nil { + column := &model.IndexColumn{ + Name: model.NewCIStr(pk), + Offset: orginColumn.Offset, + Length: types.UnspecifiedLength, + } + columns = append(columns, column) + } + } + if len(columns) != 0 { + return &model.IndexInfo{ + Table: model.NewCIStr(table), + Unique: true, + Primary: true, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: columns, + }, nil + } + } + // else if len(index.uks) != 0 { + // for i := 0; i < len(index.uks); i++ { + // if !index.uksIsNull[i] { + // if originColumn := model.FindColumnInfo(originTi.Columns, index.uks[i]); originColumn != nil { + // return &model.IndexInfo{ + // Table: model.NewCIStr(table), + // Unique: true, + // Primary: false, + // State: model.StatePublic, + // Tp: model.IndexTypeBtree, + // Columns: []*model.IndexColumn{{ + // Name: model.NewCIStr(index.uks[i]), + // Offset: originColumn.Offset, + // Length: types.UnspecifiedLength, + // }}, + // }, nil + // } + // } + // } + // } + + return nil, nil +} + +// GetAvailableUKToIndexInfo gets available downstream UK whose data is not null +// note. this function will not init toIndexes. +func (tr *Tracker) GetAvailableUKToIndexInfo(db, table string, originTi *model.TableInfo, data []interface{}) *model.IndexInfo { + if tr.toIndexes == nil || tr.toIndexes[db] == nil || tr.toIndexes[db][table] == nil { + return nil + } + index := tr.toIndexes[db][table] + for i := 0; i < len(index.uks); i++ { + if originColumn := model.FindColumnInfo(originTi.Columns, index.uks[i]); originColumn != nil { + if data[originColumn.Offset] != nil { + return &model.IndexInfo{ + Table: model.NewCIStr(table), + Unique: true, + Primary: false, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: []*model.IndexColumn{{ + Name: model.NewCIStr(index.uks[i]), + Offset: originColumn.Offset, + Length: types.UnspecifiedLength, + }}, + } + } + } + } + return nil +} + +// SetToIndexNotAvailable set toIndex available is false +// func (tr *Tracker) SetToIndexNotAvailable(db, table string) { + +// if tr.toIndexes == nil || tr.toIndexes[db] == nil || tr.toIndexes[db][table] == nil || !tr.toIndexes[db][table].isAlive { +// return +// } else { +// tr.toIndexes[db][table].isAlive = false +// } +// } + +// TrackToIndex remove schema or table in toIndex. +func (tr *Tracker) TrackToIndex(targetTables []*filter.Table) { + if tr.toIndexes == nil || targetTables == nil { + return + } + + for i := 0; i < len(targetTables); i++ { + db := targetTables[i].Schema + table := targetTables[i].Name + if tr.toIndexes[db] == nil { + return + } + if table == "" { + delete(tr.toIndexes, db) + log.L().Info(fmt.Sprintf("Remove downStream schema tracker %s ", db)) + } else { + if tr.toIndexes[db][table] == nil { + return + } + delete(tr.toIndexes[db], table) + log.L().Info(fmt.Sprintf("Remove downStream schema tracker %s.%s ", db, table)) + } + } + +} diff --git a/syncer/dml.go b/syncer/dml.go index 106404c02b..2320571cfe 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/expression" "go.uber.org/zap" + tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" ) @@ -100,23 +101,30 @@ RowLoop: } func (s *Syncer) genUpdateSQLs( + tctx *tcontext.Context, param *genDMLParam, oldValueFilters []expression.Expression, newValueFilters []expression.Expression, ) ([]string, [][]string, [][]interface{}, error) { var ( - qualifiedName = dbutil.TableName(param.schema, param.table) - data = param.data - originalData = param.originalData - columns = param.columns - ti = param.originalTableInfo - defaultIndexColumns = findFitIndex(ti) - replaceSQL string // `REPLACE INTO` SQL - sqls = make([]string, 0, len(data)/2) - keys = make([][]string, 0, len(data)/2) - values = make([][]interface{}, 0, len(data)/2) + qualifiedName = dbutil.TableName(param.schema, param.table) + data = param.data + originalData = param.originalData + columns = param.columns + ti = param.originalTableInfo + // defaultIndexColumns = findFitIndex(ti) + replaceSQL string // `REPLACE INTO` SQL + sqls = make([]string, 0, len(data)/2) + keys = make([][]string, 0, len(data)/2) + values = make([][]interface{}, 0, len(data)/2) ) + // if downstream pk exits, then use downstream pk + defaultIndexColumns, err := s.schemaTracker.GetToIndexInfo(param.schema, param.table, ti, tctx, s.cfg.Name, s.ddlDBConn.BaseConn) + if err != nil { + return nil, nil, nil, err + } + if param.safeMode { replaceSQL = genInsertReplace("REPLACE INTO", qualifiedName, columns) } @@ -167,7 +175,8 @@ RowLoop: } if defaultIndexColumns == nil { - defaultIndexColumns = getAvailableIndexColumn(ti, oriOldValues) + // defaultIndexColumns = getAvailableIndexColumn(ti, oriOldValues) + defaultIndexColumns = s.schemaTracker.GetAvailableUKToIndexInfo(param.schema, param.table, ti, oriOldValues) } ks := genMultipleKeys(ti, oriOldValues, qualifiedName) @@ -218,17 +227,23 @@ RowLoop: return sqls, keys, values, nil } -func (s *Syncer) genDeleteSQLs(param *genDMLParam, filterExprs []expression.Expression) ([]string, [][]string, [][]interface{}, error) { +func (s *Syncer) genDeleteSQLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]string, [][]string, [][]interface{}, error) { var ( - qualifiedName = dbutil.TableName(param.schema, param.table) - dataSeq = param.originalData - ti = param.originalTableInfo - defaultIndexColumns = findFitIndex(ti) - sqls = make([]string, 0, len(dataSeq)) - keys = make([][]string, 0, len(dataSeq)) - values = make([][]interface{}, 0, len(dataSeq)) + qualifiedName = dbutil.TableName(param.schema, param.table) + dataSeq = param.originalData + ti = param.originalTableInfo + // defaultIndexColumns = findFitIndex(ti) + sqls = make([]string, 0, len(dataSeq)) + keys = make([][]string, 0, len(dataSeq)) + values = make([][]interface{}, 0, len(dataSeq)) ) + // if downstream pk exits, then use downstream pk + defaultIndexColumns, err := s.schemaTracker.GetToIndexInfo(param.schema, param.table, ti, tctx, s.cfg.Name, s.ddlDBConn.BaseConn) + if err != nil { + return nil, nil, nil, err + } + RowLoop: for _, data := range dataSeq { if len(data) != len(ti.Columns) { @@ -249,8 +264,10 @@ RowLoop: } if defaultIndexColumns == nil { - defaultIndexColumns = getAvailableIndexColumn(ti, value) + // defaultIndexColumns = getAvailableIndexColumn(ti, value) + defaultIndexColumns = s.schemaTracker.GetAvailableUKToIndexInfo(param.schema, param.table, ti, value) } + ks := genMultipleKeys(ti, value, qualifiedName) sql, value := genDeleteSQL(qualifiedName, value, ti.Columns, defaultIndexColumns) @@ -498,13 +515,13 @@ func findFitIndex(ti *model.TableInfo) *model.IndexInfo { return getSpecifiedIndexColumn(ti, fn) } -func getAvailableIndexColumn(ti *model.TableInfo, data []interface{}) *model.IndexInfo { - fn := func(i int) bool { - return data[i] == nil - } +// func getAvailableIndexColumn(ti *model.TableInfo, data []interface{}) *model.IndexInfo { +// fn := func(i int) bool { +// return data[i] == nil +// } - return getSpecifiedIndexColumn(ti, fn) -} +// return getSpecifiedIndexColumn(ti, fn) +// } func getSpecifiedIndexColumn(ti *model.TableInfo, fn func(i int) bool) *model.IndexInfo { for _, indexCols := range ti.Indices { diff --git a/syncer/syncer.go b/syncer/syncer.go index 5797ca6eaf..ff7b7bcddb 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -2223,7 +2223,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } param.safeMode = ec.safeMode - sqls, keys, args, err = s.genUpdateSQLs(param, oldExprFilter, newExprFilter) + sqls, keys, args, err = s.genUpdateSQLs(ec.tctx, param, oldExprFilter, newExprFilter) if err != nil { return terror.Annotatef(err, "gen update sqls failed, originSchema: %s, originTable: %s, schema: %s, table: %s", originSchema, originTable, schemaName, tableName) } @@ -2236,7 +2236,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err2 } - sqls, keys, args, err = s.genDeleteSQLs(param, exprFilter) + sqls, keys, args, err = s.genDeleteSQLs(ec.tctx, param, exprFilter) if err != nil { return terror.Annotatef(err, "gen delete sqls failed, originSchema: %s, originTable: %s, schema: %s, table: %s", originSchema, originTable, schemaName, tableName) } @@ -2755,6 +2755,7 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. shouldTableExistNum int // tableNames[:shouldTableExistNum] should exist shouldRefTableExistNum int // tableNames[1:shouldTableExistNum] should exist, since first one is "caller table" tryFetchDownstreamTable bool // to make sure if not exists will execute correctly + shouldTrackToIndex bool // track toIndex ) switch node := stmt.(type) { @@ -2765,6 +2766,7 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. shouldSchemaExist = true case *ast.DropDatabaseStmt: shouldExecDDLOnSchemaTracker = true + shouldTrackToIndex = true if s.cfg.ShardMode == "" { if err := s.checkpoint.DeleteSchemaPoint(ec.tctx, srcTable.Schema); err != nil { return err @@ -2781,6 +2783,7 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. tryFetchDownstreamTable = true case *ast.DropTableStmt: shouldExecDDLOnSchemaTracker = true + shouldTrackToIndex = true if err := s.checkpoint.DeleteTablePoint(ec.tctx, srcTable.Schema, srcTable.Name); err != nil { return err } @@ -2788,8 +2791,10 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. shouldExecDDLOnSchemaTracker = true shouldSchemaExist = true shouldTableExistNum = 1 + shouldTrackToIndex = true case *ast.AlterTableStmt: shouldSchemaExist = true + shouldTrackToIndex = true // for DDL that adds FK, since TiDB doesn't fully support it yet, we simply ignore execution of this DDL. switch { case len(node.Specs) == 1 && node.Specs[0].Constraint != nil && node.Specs[0].Constraint.Tp == ast.ConstraintForeignKey: @@ -2808,6 +2813,10 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. ec.tctx.L().DPanic("unhandled DDL type cannot be tracked", zap.Stringer("type", reflect.TypeOf(stmt))) } + if shouldTrackToIndex { + s.schemaTracker.TrackToIndex(targetTables) + } + if shouldSchemaExist { if err := s.schemaTracker.CreateSchemaIfNotExists(srcTable.Schema); err != nil { return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, srcTable.Schema) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ea9aaf700c..7afb716e01 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -939,12 +939,12 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) { c.Assert(args[0], DeepEquals, testCase.args[idx]) case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: // test with sql_mode = false only - sqls, _, args, err = syncer.genUpdateSQLs(param, nil, nil) + sqls, _, args, err = syncer.genUpdateSQLs(tcontext.Background(), param, nil, nil) c.Assert(err, IsNil) c.Assert(sqls[0], Equals, testCase.expected[idx]) c.Assert(args[0], DeepEquals, testCase.args[idx]) case replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: - sqls, _, args, err = syncer.genDeleteSQLs(param, nil) + sqls, _, args, err = syncer.genDeleteSQLs(tcontext.Background(), param, nil) c.Assert(err, IsNil) c.Assert(sqls[0], Equals, testCase.expected[idx]) c.Assert(args[0], DeepEquals, testCase.args[idx])