diff --git a/dm/dm/config/task.go b/dm/dm/config/task.go index 69f536236e0..5d661eeb894 100644 --- a/dm/dm/config/task.go +++ b/dm/dm/config/task.go @@ -60,6 +60,8 @@ const ( ValidationNone = "none" ValidationFast = "fast" ValidationFull = "full" + + DefaultValidatorWorkerCount = 4 ) // default config item values. @@ -337,7 +339,8 @@ func (m *SyncerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { } type ValidatorConfig struct { - Mode string `yaml:"mode" toml:"mode" json:"mode"` + Mode string `yaml:"mode" toml:"mode" json:"mode"` + WorkerCount int `yaml:"worker-count" toml:"worker-count" json:"worker-count"` } func (v *ValidatorConfig) adjust() error { @@ -347,6 +350,9 @@ func (v *ValidatorConfig) adjust() error { if v.Mode != ValidationNone && v.Mode != ValidationFast && v.Mode != ValidationFull { return terror.ErrConfigValidationMode } + if v.WorkerCount <= 0 { + v.WorkerCount = DefaultValidatorWorkerCount + } return nil } diff --git a/dm/pkg/binlog/position.go b/dm/pkg/binlog/position.go index ad7cfb1e719..4989606f894 100644 --- a/dm/pkg/binlog/position.go +++ b/dm/pkg/binlog/position.go @@ -428,3 +428,7 @@ func (l *Location) SetGTID(gset gmysql.GTIDSet) error { func (l *Location) GetGTID() gtid.Set { return l.gtidSet } + +func (l *Location) Update(gtidStr string) error { + return l.gtidSet.Update(gtidStr) +} diff --git a/dm/pkg/conn/mockdb.go b/dm/pkg/conn/mockdb.go index 8d8a98c409c..d5bd7f045ca 100644 --- a/dm/pkg/conn/mockdb.go +++ b/dm/pkg/conn/mockdb.go @@ -74,6 +74,19 @@ func InitVersionDB(c *check.C) sqlmock.Sqlmock { return mock } +func InitMockDBFull() (*sql.DB, sqlmock.Sqlmock, error) { + db, mock, err := sqlmock.New() + if err != nil { + return nil, nil, err + } + if mdbp, ok := DefaultDBProvider.(*mockDBProvider); ok { + mdbp.db = db + } else { + DefaultDBProvider = &mockDBProvider{db: db} + } + return db, mock, err +} + // TODO: export Config in https://github.com/pingcap/tidb/blob/a8fa29b56d633b1ec843e21cb89131dd4fd601db/br/pkg/mock/mock_cluster.go#L35 // Cluster is mock tidb cluster. type Cluster struct { diff --git a/dm/syncer/data_validator.go b/dm/syncer/data_validator.go index 1c4691d4105..155076bf02f 100644 --- a/dm/syncer/data_validator.go +++ b/dm/syncer/data_validator.go @@ -15,23 +15,76 @@ package syncer import ( "context" + "fmt" + "strings" "sync" "time" "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/parser/model" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/pb" "github.com/pingcap/tiflow/dm/dm/unit" + "github.com/pingcap/tiflow/dm/pkg/binlog" + "github.com/pingcap/tiflow/dm/pkg/binlog/event" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/schema" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/dm/syncer/dbconn" ) +const ( + checkInterval = 5 * time.Second + validationInterval = 10 * time.Second +) + +type validateTableInfo struct { + Source *filter.Table + Info *model.TableInfo + PrimaryKey *model.IndexInfo + Target *filter.Table // target table after route + pkIndices []int // TODO: can we use offset of in indexColumn? may remove this field in that case +} + +type rowChangeType int + +const ( + rowInsert rowChangeType = iota + rowDeleted + rowUpdated +) + +// change of table +// binlog changes are clustered into table changes +// the validator validates changes of table-grain at a time. +//nolint +type tableChange struct { + table *validateTableInfo + rows map[string]*rowChange +} + +// change of a row. +// todo: this struct and some logic on it may reuse RowChange in pkg/sqlmodel +// todo: maybe we can use reuse it later. +type rowChange struct { + table *validateTableInfo + key string + pkValues []string + data []interface{} + tp rowChangeType + lastMeetTS int64 // the last meet timestamp(in seconds) + //nolint + failedCnt int // failed count +} + // DataValidator // validator can be start when there's syncer unit in the subtask and validation mode is not none, // it's terminated when the subtask is terminated. @@ -45,45 +98,77 @@ type DataValidator struct { cfg *config.SubTaskConfig syncer *Syncer - stage pb.Stage - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc + stage pb.Stage + wg sync.WaitGroup + errProcessWg sync.WaitGroup + errChan chan error + ctx context.Context + cancel context.CancelFunc + tctx *tcontext.Context L log.Logger fromDB *conn.BaseDB + toDB *conn.BaseDB + toDBConns []*dbconn.DBConn timezone *time.Location syncCfg replication.BinlogSyncerConfig streamerController *StreamerController - result pb.ProcessResult + result pb.ProcessResult + validateInterval time.Duration + changeEventCount []atomic.Int64 + workerCnt int + + // such as table without primary key + unsupportedTable map[string]string } func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer) *DataValidator { - c := &DataValidator{ + v := &DataValidator{ cfg: cfg, syncer: syncerObj, stage: pb.Stage_Stopped, } - c.L = log.With(zap.String("task", cfg.Name), zap.String("unit", "continuous validator")) - return c + v.L = log.With(zap.String("task", cfg.Name), zap.String("unit", "continuous validator")) + + v.workerCnt = cfg.ValidatorCfg.WorkerCount + v.changeEventCount = make([]atomic.Int64, 3) + v.validateInterval = validationInterval + + v.unsupportedTable = make(map[string]string) + + return v } func (v *DataValidator) initialize() error { + v.ctx, v.cancel = context.WithCancel(context.Background()) + v.tctx = tcontext.NewContext(v.ctx, v.L) + v.result.Reset() + // todo: many place may put into this channel, choose a proper channel size or enhance error handling + v.errChan = make(chan error, 10) + newCtx, cancelFunc := context.WithTimeout(v.ctx, unit.DefaultInitTimeout) defer cancelFunc() tctx := tcontext.NewContext(newCtx, v.L) var err error defer func() { - if err != nil && v.fromDB != nil { - v.fromDB.Close() + if err == nil { + return } + dbconn.CloseBaseDB(tctx, v.fromDB) + dbconn.CloseBaseDB(tctx, v.toDB) + v.cancel() }() dbCfg := v.cfg.From dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout) - v.fromDB, err = dbconn.CreateBaseDB(&dbCfg) + v.fromDB, _, err = dbconn.CreateConns(tctx, v.cfg, &dbCfg, 0) + if err != nil { + return err + } + + v.toDB, v.toDBConns, err = dbconn.CreateConns(tctx, v.cfg, &dbCfg, v.workerCnt) if err != nil { return err } @@ -99,7 +184,6 @@ func (v *DataValidator) initialize() error { } v.streamerController = NewStreamerController(v.syncCfg, v.cfg.EnableGTID, &dbconn.UpStreamConn{BaseDB: v.fromDB}, v.cfg.RelayDir, v.timezone, nil) - return nil } @@ -112,8 +196,6 @@ func (v *DataValidator) Start(expect pb.Stage) { return } - v.ctx, v.cancel = context.WithCancel(context.Background()) - if err := v.initialize(); err != nil { v.fillResult(err, false) return @@ -129,6 +211,9 @@ func (v *DataValidator) Start(expect pb.Stage) { v.doValidate() }() + v.errProcessWg.Add(1) + go v.errorProcessRoutine() + v.stage = pb.Stage_Running } @@ -138,38 +223,141 @@ func (v *DataValidator) fillResult(err error, needLock bool) { defer v.Unlock() } - var errs []*pb.ProcessError - if utils.IsContextCanceledError(err) { - v.L.Info("filter out context cancelled error", log.ShortError(err)) - } else { - errs = append(errs, unit.NewProcessError(err)) - } - + // todo: if error, validation is stopped( and cancelled), and we may receive a cancelled error. + // todo: do we need this cancel field? isCanceled := false select { case <-v.ctx.Done(): isCanceled = true default: } + v.result.IsCanceled = isCanceled + + if utils.IsContextCanceledError(err) { + v.L.Info("filter out context cancelled error", log.ShortError(err)) + } else { + v.result.Errors = append(v.result.Errors, unit.NewProcessError(err)) + } +} + +func (v *DataValidator) errorProcessRoutine() { + defer v.errProcessWg.Done() + for err := range v.errChan { + v.fillResult(err, true) + + if errors.Cause(err) != context.Canceled { + // todo: need a better way to handle err(auto resuming on some error, etc.) + v.stopInner() + } + } +} + +func (v *DataValidator) waitSyncerSynced(currLoc binlog.Location) error { + syncLoc := v.syncer.checkpoint.FlushedGlobalPoint() + cmp := binlog.CompareLocation(currLoc, syncLoc, v.cfg.EnableGTID) + if cmp <= 0 { + return nil + } - v.result = pb.ProcessResult{ - IsCanceled: isCanceled, - Errors: errs, + for { + select { + case <-v.ctx.Done(): + return v.ctx.Err() + case <-time.After(checkInterval): + syncLoc = v.syncer.checkpoint.FlushedGlobalPoint() + cmp = binlog.CompareLocation(currLoc, syncLoc, v.cfg.EnableGTID) + if cmp <= 0 { + return nil + } + v.L.Info("wait syncer synced", zap.Reflect("loc", currLoc)) + } + } +} + +func (v *DataValidator) waitSyncerRunning() error { + if v.syncer.IsRunning() { + return nil + } + for { + select { + case <-v.ctx.Done(): + return v.ctx.Err() + case <-time.After(checkInterval): + if v.syncer.IsRunning() { + return nil + } + } } } +// doValidate: runs in a separate goroutine. func (v *DataValidator) doValidate() { - tctx := tcontext.NewContext(v.ctx, v.L) - err := v.streamerController.Start(tctx, lastLocationForTest) - if err != nil { - v.fillResult(terror.Annotate(err, "fail to restart streamer controller"), true) + if err := v.waitSyncerRunning(); err != nil { + v.errChan <- terror.Annotate(err, "failed to wait syncer running") return } + // todo: if validator starts on task start, we need to make sure the location we got is the init location of syncer + // todo: right now, there's change they have a gap + location := v.syncer.checkpoint.FlushedGlobalPoint() + // it's for test, some fields in streamerController is mocked, cannot call Start + if v.streamerController.IsClosed() { + err := v.streamerController.Start(v.tctx, location) + if err != nil { + v.errChan <- terror.Annotate(err, "fail to start streamer controller") + return + } + } + v.L.Info("start continuous validation") + v.startValidateWorkers() + + currLoc := location.CloneWithFlavor(v.cfg.Flavor) + for { + e, err := v.streamerController.GetEvent(v.tctx) + if err != nil { + v.errChan <- terror.Annotate(err, "fail to get binlog from stream controller") + return + } + + switch ev := e.Event.(type) { + case *replication.RotateEvent: + currLoc.Position.Name = string(ev.NextLogName) + currLoc.Position.Pos = uint32(ev.Position) + case *replication.GTIDEvent, *replication.MariadbGTIDEvent: + currLoc.Position.Pos = e.Header.LogPos + gtidStr, _ := event.GetGTIDStr(e) + if err = currLoc.Update(gtidStr); err != nil { + v.errChan <- terror.Annotate(err, "failed to update gtid set") + return + } + default: + currLoc.Position.Pos = e.Header.LogPos + } + + // wait until syncer synced current event + err = v.waitSyncerSynced(currLoc) + if err != nil { + v.errChan <- terror.Annotate(err, "failed to wait syncer") + return + } + + if ev, ok := e.Event.(*replication.RowsEvent); ok { + if err = v.processRowsEvent(e.Header, ev); err != nil { + v.L.Warn("failed to process event: ", zap.Reflect("error", err)) + v.errChan <- terror.Annotate(err, "failed to process event") + return + } + } + } } func (v *DataValidator) Stop() { + v.stopInner() + v.errProcessWg.Wait() +} + +func (v *DataValidator) stopInner() { v.Lock() defer v.Unlock() if v.stage != pb.Stage_Running { @@ -177,13 +365,14 @@ func (v *DataValidator) Stop() { return } + v.cancel() v.streamerController.Close() v.fromDB.Close() + v.toDB.Close() - if v.cancel != nil { - v.cancel() - } v.wg.Wait() + close(v.errChan) // close error chan after all possible sender goroutines stopped + v.stage = pb.Stage_Stopped } @@ -198,3 +387,168 @@ func (v *DataValidator) Stage() pb.Stage { defer v.RUnlock() return v.stage } + +func (v *DataValidator) startValidateWorkers() { +} + +func (v *DataValidator) dispatchRowChange(key string, row *rowChange) { +} + +func (v *DataValidator) processRowsEvent(header *replication.EventHeader, ev *replication.RowsEvent) error { + sourceTable := &filter.Table{ + Schema: string(ev.Table.Schema), + Name: string(ev.Table.Table), + } + fullTableName := sourceTable.String() + if _, ok := v.unsupportedTable[fullTableName]; ok { + return nil + } + + needSkip, err := v.syncer.skipRowsEvent(sourceTable, header.EventType) + if err != nil { + return err + } + if needSkip { + return nil + } + + targetTable := v.syncer.route(sourceTable) + // todo: syncer will change schema in schemaTracker, will there be data race? + // todo: what if table is dropped while validator falls behind? + tableInfo, err := v.syncer.schemaTracker.GetTableInfo(sourceTable) + if err != nil { + if schema.IsTableNotExists(err) { + // not a table need to sync + return nil + } + return terror.Annotate(err, "failed to get table info") + } + + if len(tableInfo.Columns) < int(ev.ColumnCount) { + v.unsupportedTable[fullTableName] = "binlog has more columns than current table" + return nil + } + + var primaryIdx *model.IndexInfo + for _, idx := range tableInfo.Indices { + if idx.Primary { + primaryIdx = idx + } + } + // todo: PKIsHandle = true when table has primary key like "id int primary key CLUSTERED", since schema-tracker(we get from it) + // todo: only use downstream DDL when the task is incremental only, will support this case later. + if primaryIdx == nil { + // todo: for table without primary index, need to record in the failed table, will add it later together with checkpoint + v.unsupportedTable[fullTableName] = "without primary key" + return nil + } + + table := &validateTableInfo{ + Source: sourceTable, + Info: tableInfo, + PrimaryKey: primaryIdx, + Target: targetTable, + } + + for _, cols := range ev.SkippedColumns { + if len(cols) > 0 { + err := errors.Errorf("unexpected skipped columns for table %s", sourceTable.String()) + return err + } + } + + changeType := getRowChangeType(header.EventType) + v.changeEventCount[changeType].Inc() + + columnMap := make(map[string]*model.ColumnInfo) + for _, col := range tableInfo.Columns { + columnMap[col.Name.O] = col + } + pk := table.PrimaryKey + pkIndices := make([]int, len(pk.Columns)) + for i, col := range pk.Columns { + pkIndices[i] = columnMap[col.Name.O].Offset + } + table.pkIndices = pkIndices + + step := 1 + if changeType == rowUpdated { + step = 2 + } + for i := 0; i < len(ev.Rows); i += step { + row := ev.Rows[i] + pkValue := make([]string, len(pk.Columns)) + for _, idx := range pkIndices { + pkValue[idx] = string(genColData(row[idx])) + } + key := genRowKey(pkValue) + + if changeType == rowUpdated { + afterRowChangeType := changeType + afterRow := ev.Rows[i+1] + afterPkValue := make([]string, len(pk.Columns)) + for _, idx := range pkIndices { + afterPkValue[idx] = string(genColData(afterRow[idx])) + } + afterKey := genRowKey(afterPkValue) + if afterKey != key { + // TODO: may reuse IsIdentityUpdated/SplitUpdate of RowChange in pkg/sqlmodel + v.dispatchRowChange(key, &rowChange{ + table: table, + key: key, + pkValues: pkValue, + data: row, + tp: rowDeleted, + lastMeetTS: int64(header.Timestamp), + }) + afterRowChangeType = rowInsert + } + v.dispatchRowChange(afterKey, &rowChange{ + table: table, + key: afterKey, + pkValues: afterPkValue, + data: afterRow, + tp: afterRowChangeType, + lastMeetTS: int64(header.Timestamp), + }) + } else { + v.dispatchRowChange(key, &rowChange{ + table: table, + key: key, + pkValues: pkValue, + data: row, + tp: changeType, + lastMeetTS: int64(header.Timestamp), + }) + } + } + return nil +} + +// getRowChangeType should be called only when the event type is RowsEvent. +func getRowChangeType(t replication.EventType) rowChangeType { + switch t { + case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + return rowInsert + case replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + return rowUpdated + default: + // replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + return rowDeleted + } +} + +func genRowKey(pkValues []string) string { + return strings.Join(pkValues, "-") +} + +func genColData(v interface{}) []byte { + switch dv := v.(type) { + case []byte: + return dv + case string: + return []byte(dv) + } + s := fmt.Sprintf("%v", v) + return []byte(s) +} diff --git a/dm/syncer/data_validator_test.go b/dm/syncer/data_validator_test.go new file mode 100644 index 00000000000..8656b3e9ff4 --- /dev/null +++ b/dm/syncer/data_validator_test.go @@ -0,0 +1,460 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "context" + "database/sql" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/shopspring/decimal" + "github.com/stretchr/testify/require" + + "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/dm/pb" + "github.com/pingcap/tiflow/dm/pkg/binlog" + "github.com/pingcap/tiflow/dm/pkg/binlog/event" + "github.com/pingcap/tiflow/dm/pkg/conn" + "github.com/pingcap/tiflow/dm/pkg/gtid" + "github.com/pingcap/tiflow/dm/pkg/retry" + "github.com/pingcap/tiflow/dm/pkg/router" + "github.com/pingcap/tiflow/dm/pkg/schema" + "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/dm/syncer/dbconn" +) + +func genEventGenerator(t *testing.T) *event.Generator { + t.Helper() + previousGTIDSetStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14" + previousGTIDSet, err := gtid.ParserGTID(mysql.MySQLFlavor, previousGTIDSetStr) + require.NoError(t, err) + latestGTIDStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" + latestGTID, err := gtid.ParserGTID(mysql.MySQLFlavor, latestGTIDStr) + require.NoError(t, err) + eventsGenerator, err := event.NewGenerator(mysql.MySQLFlavor, 1, 0, latestGTID, previousGTIDSet, 0) + require.NoError(t, err) + require.NoError(t, err) + + return eventsGenerator +} + +func genSubtaskConfig(t *testing.T) *config.SubTaskConfig { + t.Helper() + loaderCfg := config.LoaderConfig{ + Dir: t.TempDir(), + } + cfg := &config.SubTaskConfig{ + From: config.GetDBConfigForTest(), + To: config.GetDBConfigForTest(), + Timezone: "UTC", + ServerID: 101, + Name: "validator_ut", + ShadowTableRules: []string{config.DefaultShadowTableRules}, + TrashTableRules: []string{config.DefaultTrashTableRules}, + Mode: config.ModeIncrement, + Flavor: mysql.MySQLFlavor, + LoaderConfig: loaderCfg, + SyncerConfig: config.SyncerConfig{ + EnableGTID: false, + }, + ValidatorCfg: config.ValidatorConfig{ + Mode: config.ModeFull, + WorkerCount: 1, + }, + } + cfg.Experimental.AsyncCheckpointFlush = true + cfg.From.Adjust() + cfg.To.Adjust() + + cfg.UseRelay = false + + return cfg +} + +//nolint +func genDBConn(t *testing.T, db *sql.DB, cfg *config.SubTaskConfig) *dbconn.DBConn { + t.Helper() + baseDB := conn.NewBaseDB(db, func() {}) + baseConn, err := baseDB.GetBaseConn(context.Background()) + require.NoError(t, err) + return &dbconn.DBConn{ + BaseConn: baseConn, + Cfg: cfg, + } +} + +func TestValidatorStartStop(t *testing.T) { + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + + // validator already running + validator := NewContinuousDataValidator(cfg, syncerObj) + validator.stage = pb.Stage_Running + validator.Start(pb.Stage_InvalidStage) + // if validator already running, Start will return immediately, so we check validator.ctx which has not initialized. + require.Nil(t, validator.ctx) + + // failed to init + cfg.From = config.DBConfig{ + Host: "invalid host", + Port: 3306, + User: "root", + } + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + require.Equal(t, pb.Stage_Stopped, validator.Stage()) + require.Len(t, validator.result.Errors, 1) + + // start with Stopped stage + _, _, err := conn.InitMockDBFull() + require.NoError(t, err) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + require.Equal(t, pb.Stage_Stopped, validator.Stage()) + require.Len(t, validator.result.Errors, 0) + + // normal start & stop + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Running) + defer validator.Stop() // in case assert failed before Stop + require.Equal(t, pb.Stage_Running, validator.Stage()) + require.True(t, validator.Started()) + validator.Stop() + require.Equal(t, pb.Stage_Stopped, validator.Stage()) + + // stop before start, should not panic + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Stop() +} + +func TestValidatorFillResult(t *testing.T) { + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + _, _, err := conn.InitMockDBFull() + require.NoError(t, err) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + + validator := NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Running) + defer validator.Stop() // in case assert failed before Stop + validator.fillResult(errors.New("test error"), false) + require.Len(t, validator.result.Errors, 1) + validator.fillResult(errors.New("test error"), true) + require.Len(t, validator.result.Errors, 2) + validator.Stop() + validator.fillResult(validator.ctx.Err(), true) + require.Len(t, validator.result.Errors, 2) +} + +func TestValidatorErrorProcessRoutine(t *testing.T) { + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + _, _, err := conn.InitMockDBFull() + require.NoError(t, err) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + + validator := NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Running) + defer validator.Stop() + require.Equal(t, pb.Stage_Running, validator.Stage()) + validator.errChan <- errors.New("test error") + require.True(t, utils.WaitSomething(20, 100*time.Millisecond, func() bool { + return validator.Stage() == pb.Stage_Stopped + })) + require.Len(t, validator.result.Errors, 1) +} + +type mockedCheckPointForValidator struct { + CheckPoint + cnt int + currLoc binlog.Location + nextLoc binlog.Location +} + +func (c *mockedCheckPointForValidator) FlushedGlobalPoint() binlog.Location { + c.cnt++ + if c.cnt <= 2 { + return c.currLoc + } + return c.nextLoc +} + +func TestValidatorWaitSyncerSynced(t *testing.T) { + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + _, _, err := conn.InitMockDBFull() + require.NoError(t, err) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + + currLoc := binlog.NewLocation(cfg.Flavor) + validator := NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + require.NoError(t, validator.waitSyncerSynced(currLoc)) + + // cancelled + currLoc.Position = mysql.Position{ + Name: "mysql-bin.000001", + Pos: 100, + } + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + validator.cancel() + require.ErrorIs(t, validator.waitSyncerSynced(currLoc), context.Canceled) + + currLoc.Position = mysql.Position{ + Name: "mysql-bin.000001", + Pos: 100, + } + syncerObj.checkpoint = &mockedCheckPointForValidator{ + currLoc: binlog.NewLocation(cfg.Flavor), + nextLoc: currLoc, + } + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + require.NoError(t, validator.waitSyncerSynced(currLoc)) +} + +func TestValidatorWaitSyncerRunning(t *testing.T) { + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + _, _, err := conn.InitMockDBFull() + require.NoError(t, err) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + + validator := NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + validator.cancel() + require.Error(t, validator.waitSyncerRunning()) + + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + syncerObj.running.Store(true) + require.NoError(t, validator.waitSyncerRunning()) + + validator = NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + syncerObj.running.Store(false) + go func() { + time.Sleep(3 * time.Second) + syncerObj.running.Store(true) + }() + require.NoError(t, validator.waitSyncerRunning()) +} + +func TestValidatorDoValidate(t *testing.T) { + var ( + schemaName = "test" + tableName = "tbl" + tableName2 = "tbl2" + tableName3 = "tbl3" + tableName4 = "tbl4" + createTableSQL = "CREATE TABLE `" + tableName + "`(id int primary key, v varchar(100))" + createTableSQL2 = "CREATE TABLE `" + tableName2 + "`(id int primary key)" + createTableSQL3 = "CREATE TABLE `" + tableName3 + "`(id int, v varchar(100))" + ) + cfg := genSubtaskConfig(t) + _, _, err := conn.InitMockDBFull() + require.NoError(t, err) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + + syncerObj := NewSyncer(cfg, nil, nil) + syncerObj.running.Store(true) + syncerObj.tableRouter, err = router.NewRouter(cfg.CaseSensitive, []*router.TableRule{}) + require.NoError(t, err) + currLoc := binlog.NewLocation(cfg.Flavor) + currLoc.Position = mysql.Position{ + Name: "mysql-bin.000001", + Pos: 3000, + } + syncerObj.checkpoint = &mockedCheckPointForValidator{ + currLoc: binlog.NewLocation(cfg.Flavor), + nextLoc: currLoc, + cnt: 2, + } + db, mock, err := sqlmock.New() + require.NoError(t, err) + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( + mock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""), + ) + dbConn, err := db.Conn(context.Background()) + require.NoError(t, err) + syncerObj.downstreamTrackConn = &dbconn.DBConn{Cfg: cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn) + require.NoError(t, err) + require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName)) + require.NoError(t, syncerObj.schemaTracker.Exec(context.Background(), schemaName, createTableSQL)) + require.NoError(t, syncerObj.schemaTracker.Exec(context.Background(), schemaName, createTableSQL2)) + require.NoError(t, syncerObj.schemaTracker.Exec(context.Background(), schemaName, createTableSQL3)) + + generator := genEventGenerator(t) + rotateEvent, _, err := generator.Rotate("mysql-bin.000001", 0) + require.NoError(t, err) + insertData := []*event.DMLData{ + { + TableID: 11, + Schema: schemaName, + Table: tableName, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + }, + }, + // 2 columns in binlog, but ddl of tbl2 only has one column + { + TableID: 12, + Schema: schemaName, + Table: tableName2, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + }, + }, + // tbl3 has no primary key + { + TableID: 13, + Schema: schemaName, + Table: tableName3, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + {int32(1), "a"}, + {int32(2), "b"}, + {int32(3), "c"}, + }, + }, + // tbl3 has no primary key, since we met it before, will return immediately + { + TableID: 13, + Schema: schemaName, + Table: tableName3, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + {int32(4), "a"}, + }, + }, + } + updateData := []*event.DMLData{ + { + TableID: 11, + Schema: schemaName, + Table: tableName, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + // update non-primary column + {int32(3), "c"}, + {int32(3), "d"}, + // update primary column and non-primary column + {int32(1), "a"}, + {int32(4), "b"}, + }, + }, + } + deleteData := []*event.DMLData{ + { + TableID: 11, + Schema: schemaName, + Table: tableName, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + {int32(3), "c"}, + }, + }, + // no ddl for this table + { + TableID: 14, + Schema: schemaName, + Table: tableName4, + ColumnType: []byte{mysql.MYSQL_TYPE_LONG, mysql.MYSQL_TYPE_STRING}, + Rows: [][]interface{}{ + {int32(4), "c"}, + }, + }, + } + dmlEvents, _, err := generator.GenDMLEvents(replication.WRITE_ROWS_EVENTv2, insertData, 0) + require.NoError(t, err) + updateEvents, _, err := generator.GenDMLEvents(replication.UPDATE_ROWS_EVENTv2, updateData, 0) + require.NoError(t, err) + deleteEvents, _, err := generator.GenDMLEvents(replication.DELETE_ROWS_EVENTv2, deleteData, 0) + require.NoError(t, err) + allEvents := []*replication.BinlogEvent{rotateEvent} + allEvents = append(allEvents, dmlEvents...) + allEvents = append(allEvents, updateEvents...) + allEvents = append(allEvents, deleteEvents...) + mockStreamerProducer := &MockStreamProducer{events: allEvents} + mockStreamer, err := mockStreamerProducer.generateStreamer(binlog.NewLocation("")) + require.NoError(t, err) + + validator := NewContinuousDataValidator(cfg, syncerObj) + validator.Start(pb.Stage_Stopped) + validator.streamerController = &StreamerController{ + streamerProducer: mockStreamerProducer, + streamer: mockStreamer, + closed: false, + } + validator.doValidate() + require.Equal(t, int64(1), validator.changeEventCount[rowInsert].Load()) + require.Equal(t, int64(1), validator.changeEventCount[rowUpdated].Load()) + require.Equal(t, int64(1), validator.changeEventCount[rowDeleted].Load()) + ft := filter.Table{Schema: schemaName, Name: tableName2} + require.Contains(t, validator.unsupportedTable, ft.String()) + ft = filter.Table{Schema: schemaName, Name: tableName3} + require.Contains(t, validator.unsupportedTable, ft.String()) +} + +func TestValidatorGetRowChangeType(t *testing.T) { + require.Equal(t, rowInsert, getRowChangeType(replication.WRITE_ROWS_EVENTv0)) + require.Equal(t, rowInsert, getRowChangeType(replication.WRITE_ROWS_EVENTv1)) + require.Equal(t, rowInsert, getRowChangeType(replication.WRITE_ROWS_EVENTv2)) + require.Equal(t, rowUpdated, getRowChangeType(replication.UPDATE_ROWS_EVENTv0)) + require.Equal(t, rowUpdated, getRowChangeType(replication.UPDATE_ROWS_EVENTv1)) + require.Equal(t, rowUpdated, getRowChangeType(replication.UPDATE_ROWS_EVENTv2)) + require.Equal(t, rowDeleted, getRowChangeType(replication.DELETE_ROWS_EVENTv0)) + require.Equal(t, rowDeleted, getRowChangeType(replication.DELETE_ROWS_EVENTv1)) + require.Equal(t, rowDeleted, getRowChangeType(replication.DELETE_ROWS_EVENTv2)) +} + +func TestValidatorGenColData(t *testing.T) { + res := genColData(1) + require.Equal(t, "1", string(res)) + res = genColData(1.2) + require.Equal(t, "1.2", string(res)) + res = genColData("abc") + require.Equal(t, "abc", string(res)) + res = genColData([]byte{'\x01', '\x02', '\x03'}) + require.Equal(t, "\x01\x02\x03", string(res)) + res = genColData(decimal.NewFromInt(222123123)) + require.Equal(t, "222123123", string(res)) +} diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 599ad4ee588..bde89f5472f 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -172,7 +172,8 @@ type Syncer struct { exprFilterGroup *ExprFilterGroup sessCtx sessionctx.Context - closed atomic.Bool + closed atomic.Bool + running atomic.Bool start atomic.Time lastTime atomic.Time @@ -258,6 +259,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel syncer.waitXIDJob.Store(int64(noWait)) syncer.isTransactionEnd = true syncer.closed.Store(false) + syncer.running.Store(false) syncer.lastBinlogSizeCount.Store(0) syncer.binlogSizeCount.Store(0) syncer.lastCount.Store(0) @@ -691,6 +693,9 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { <-newCtx.Done() // ctx or newCtx }() + s.running.Store(true) + defer s.running.Store(false) + err := s.Run(newCtx) if err != nil { // returned error rather than sent to runFatalChan @@ -3468,6 +3473,10 @@ func (s *Syncer) route(table *filter.Table) *filter.Table { return &filter.Table{Schema: targetSchema, Name: targetTable} } +func (s *Syncer) IsRunning() bool { + return s.running.Load() +} + func (s *Syncer) isClosed() bool { return s.closed.Load() } @@ -3493,6 +3502,7 @@ func (s *Syncer) Close() { // when closing syncer by `stop-task`, remove active relay log from hub s.removeActiveRelayLog() metrics.RemoveLabelValuesWithTaskInMetrics(s.cfg.Name) + s.runWg.Wait() s.closed.Store(true) } diff --git a/dm/syncer/validator_cond.go b/dm/syncer/validator_cond.go new file mode 100644 index 00000000000..247c8a2e0fb --- /dev/null +++ b/dm/syncer/validator_cond.go @@ -0,0 +1,72 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "strings" +) + +type Cond struct { + Table *validateTableInfo + ColumnCnt int + PkValues [][]string +} + +func (c *Cond) GetArgs() []interface{} { + var res []interface{} + for _, v := range c.PkValues { + for _, val := range v { + res = append(res, val) + } + } + return res +} + +func (c *Cond) GetWhere() string { + var b strings.Builder + pk := c.Table.PrimaryKey + isOneKey := len(pk.Columns) == 1 + if isOneKey { + b.WriteString(pk.Columns[0].Name.O) + } else { + b.WriteString("(") + for i := 0; i < len(pk.Columns); i++ { + if i != 0 { + b.WriteString(",") + } + b.WriteString(pk.Columns[i].Name.O) + } + b.WriteString(")") + } + b.WriteString(" in (") + for i := range c.PkValues { + if i != 0 { + b.WriteString(",") + } + if !isOneKey { + b.WriteString("(") + for j := 0; j < len(pk.Columns); j++ { + if j != 0 { + b.WriteString(",") + } + b.WriteString("?") + } + b.WriteString(")") + } else { + b.WriteString("?") + } + } + b.WriteString(")") + return b.String() +} diff --git a/dm/syncer/validator_cond_test.go b/dm/syncer/validator_cond_test.go new file mode 100644 index 00000000000..7167d3f2957 --- /dev/null +++ b/dm/syncer/validator_cond_test.go @@ -0,0 +1,178 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "database/sql" + "fmt" + "strconv" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" +) + +func genValidateTableInfo(t *testing.T, schemaName, tableName, creatSQL string) *validateTableInfo { + t.Helper() + var ( + err error + parser2 *parser.Parser + tableInfo *model.TableInfo + ) + parser2 = parser.New() + require.NoError(t, err) + tableInfo, err = dbutil.GetTableInfoBySQL(creatSQL, parser2) + require.NoError(t, err) + var primaryIdx *model.IndexInfo + for _, idx := range tableInfo.Indices { + if idx.Primary { + primaryIdx = idx + } + } + require.NotNil(t, primaryIdx) + columnMap := make(map[string]*model.ColumnInfo) + for _, col := range tableInfo.Columns { + columnMap[col.Name.O] = col + } + pkIndices := make([]int, len(primaryIdx.Columns)) + for i, col := range primaryIdx.Columns { + pkIndices[i] = columnMap[col.Name.O].Offset + } + tableDiff := &validateTableInfo{ + Source: &filter.Table{ + Schema: schemaName, + Name: tableName, + }, + Info: tableInfo, + PrimaryKey: primaryIdx, + Target: &filter.Table{ + Schema: schemaName, + Name: tableName, + }, + pkIndices: pkIndices, + } + return tableDiff +} + +func genValidationCond(t *testing.T, schemaName, tblName, creatSQL string, pkvs [][]string) *Cond { + t.Helper() + tblDiff := genValidateTableInfo(t, schemaName, tblName, creatSQL) + return &Cond{ + Table: tblDiff, + PkValues: pkvs, + } +} + +func Test_validatorCond_SelectMultiKey(t *testing.T) { + var res *sql.Rows + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + creatTbl := "create table if not exists `test_cond`.`test1`(" + + "a int," + + "b int," + + "c int," + + "primary key(a, b)" + + ");" + // get table diff + pkValues := make([][]string, 0) + for i := 0; i < 3; i++ { + // 3 primary key + key1, key2 := strconv.Itoa(i+1), strconv.Itoa(i+2) + pkValues = append(pkValues, []string{key1, key2}) + } + cond := genValidationCond(t, "test_cond", "test1", creatTbl, pkValues) + // format query string + rowsQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s;", "`test_cond`.`test1`", cond.GetWhere()) + mock.ExpectQuery( + "SELECT COUNT\\(\\*\\) FROM `test_cond`.`test1` WHERE \\(a,b\\) in \\(\\(\\?,\\?\\),\\(\\?,\\?\\),\\(\\?,\\?\\)\\);", + ).WithArgs( + "1", "2", "2", "3", "3", "4", + ).WillReturnRows(mock.NewRows([]string{"COUNT(*)"}).AddRow("3")) + require.NoError(t, err) + res, err = db.Query(rowsQuery, cond.GetArgs()...) + require.NoError(t, err) + defer res.Close() + var cnt int + if res.Next() { + err = res.Scan(&cnt) + } + require.NoError(t, err) + require.Equal(t, 3, cnt) + require.NoError(t, res.Err()) +} + +func Test_validatorCond_GetWhereArgs(t *testing.T) { + db, _, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + type testCase struct { + creatTbl string + pks [][]string + tblName string + schemaName string + args []string + where string + } + cases := []testCase{ + { + creatTbl: `create table if not exists test_cond.test2( + a char(10), + b int, + c int, + primary key(a) + );`, // single primary key, + pks: [][]string{ + {"10a0"}, {"200"}, {"abc"}, + }, + tblName: "test2", + schemaName: "test_cond", + where: "a in (?,?,?)", + args: []string{ + "10a0", "200", "abc", + }, + }, + { + creatTbl: `create table if not exists test_cond.test3( + a int, + b char(10), + c varchar(10), + primary key(a, b, c) + );`, // multi primary key + pks: [][]string{ + {"10", "abc", "ef"}, + {"9897", "afdkiefkjg", "acdee"}, + }, + tblName: "test3", + schemaName: "test_cond", + where: "(a,b,c) in ((?,?,?),(?,?,?))", + args: []string{ + "10", "abc", "ef", "9897", "afdkiefkjg", "acdee", + }, + }, + } + for i := 0; i < len(cases); i++ { + cond := genValidationCond(t, cases[i].schemaName, cases[i].tblName, cases[i].creatTbl, cases[i].pks) + require.Equal(t, cases[i].where, cond.GetWhere()) + rawArgs := cond.GetArgs() + for j := 0; j < 3; j++ { + curData := fmt.Sprintf("%v", rawArgs[j]) + require.Equal(t, cases[i].args[j], curData) + } + } +} diff --git a/dm/tests/dmctl_basic/conf/get_task.yaml b/dm/tests/dmctl_basic/conf/get_task.yaml index 3a431dbdf37..d76b328be86 100644 --- a/dm/tests/dmctl_basic/conf/get_task.yaml +++ b/dm/tests/dmctl_basic/conf/get_task.yaml @@ -164,6 +164,7 @@ syncers: validators: validator-01: mode: none + worker-count: 4 clean-dump-file: true ansi-quotes: false remove-meta: false diff --git a/dm/tests/import_v10x/conf/task.yaml b/dm/tests/import_v10x/conf/task.yaml index 10e8f111563..ac256843270 100644 --- a/dm/tests/import_v10x/conf/task.yaml +++ b/dm/tests/import_v10x/conf/task.yaml @@ -123,6 +123,7 @@ syncers: validators: validator-01: mode: none + worker-count: 4 clean-dump-file: false ansi-quotes: false remove-meta: false