diff --git a/dm/syncer/data_validator.go b/dm/syncer/data_validator.go index 2b353f7ceb3..5e5be8e7e63 100644 --- a/dm/syncer/data_validator.go +++ b/dm/syncer/data_validator.go @@ -171,7 +171,6 @@ func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer, st v.workerCnt = cfg.ValidatorCfg.WorkerCount v.processedRowCounts = make([]atomic.Int64, rowChangeTypeCount) - v.workers = make([]*validateWorker, v.workerCnt) v.validateInterval = validationInterval v.persistHelper = newValidatorCheckpointHelper(v) v.tableStatus = make(map[string]*tableValidateStatus) @@ -571,6 +570,7 @@ func (v *DataValidator) Stage() pb.Stage { func (v *DataValidator) startValidateWorkers() { v.wg.Add(v.workerCnt) + v.workers = make([]*validateWorker, v.workerCnt) for i := 0; i < v.workerCnt; i++ { worker := newValidateWorker(v, i) v.workers[i] = worker diff --git a/dm/syncer/validator_checkpoint_test.go b/dm/syncer/validator_checkpoint_test.go index ce73febfd25..2d9151a9506 100644 --- a/dm/syncer/validator_checkpoint_test.go +++ b/dm/syncer/validator_checkpoint_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/binlog" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/retry" "github.com/pingcap/tiflow/dm/pkg/schema" "github.com/pingcap/tiflow/dm/syncer/dbconn" @@ -131,3 +132,21 @@ func TestValidatorCheckpointPersist(t *testing.T) { testFunc("") testFunc("failed") } + +func TestCheckpointNotPanic(t *testing.T) { + // validator will try persisting data before starting + // if it visits and persists workers, which are not intialized before starting, + // the program will panick. + // This issue is fixed by putting off initializing workers + var err error + cfg := genSubtaskConfig(t) + syncerObj := NewSyncer(cfg, nil, nil) + require.Equal(t, log.InitLogger(&log.Config{}), nil) + validator := NewContinuousDataValidator(cfg, syncerObj, false) + validator.ctx, validator.cancel = context.WithCancel(context.Background()) + validator.tctx = tcontext.NewContext(validator.ctx, validator.L) + validator.persistHelper.tctx = validator.tctx + currLoc := binlog.NewLocation(cfg.Flavor) + err = validator.persistHelper.persist(currLoc) // persist nil worker + require.NotNil(t, err) // err not nil but program not panicks +}