Skip to content

Commit

Permalink
validator(dm): fix dmworker panic due to validator (#5243)
Browse files Browse the repository at this point in the history
close #5184
  • Loading branch information
buchuitoudegou authored Apr 25, 2022
1 parent 06f54d0 commit 83ecda9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer, st

v.workerCnt = cfg.ValidatorCfg.WorkerCount
v.processedRowCounts = make([]atomic.Int64, rowChangeTypeCount)
v.workers = make([]*validateWorker, v.workerCnt)
v.validateInterval = validationInterval
v.persistHelper = newValidatorCheckpointHelper(v)
v.tableStatus = make(map[string]*tableValidateStatus)
Expand Down Expand Up @@ -571,6 +570,7 @@ func (v *DataValidator) Stage() pb.Stage {

func (v *DataValidator) startValidateWorkers() {
v.wg.Add(v.workerCnt)
v.workers = make([]*validateWorker, v.workerCnt)
for i := 0; i < v.workerCnt; i++ {
worker := newValidateWorker(v, i)
v.workers[i] = worker
Expand Down
19 changes: 19 additions & 0 deletions dm/syncer/validator_checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/dm/pkg/schema"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
Expand Down Expand Up @@ -131,3 +132,21 @@ func TestValidatorCheckpointPersist(t *testing.T) {
testFunc("")
testFunc("failed")
}

func TestCheckpointNotPanic(t *testing.T) {
// validator will try persisting data before starting
// if it visits and persists workers, which are not intialized before starting,
// the program will panick.
// This issue is fixed by putting off initializing workers
var err error
cfg := genSubtaskConfig(t)
syncerObj := NewSyncer(cfg, nil, nil)
require.Equal(t, log.InitLogger(&log.Config{}), nil)
validator := NewContinuousDataValidator(cfg, syncerObj, false)
validator.ctx, validator.cancel = context.WithCancel(context.Background())
validator.tctx = tcontext.NewContext(validator.ctx, validator.L)
validator.persistHelper.tctx = validator.tctx
currLoc := binlog.NewLocation(cfg.Flavor)
err = validator.persistHelper.persist(currLoc) // persist nil worker
require.NotNil(t, err) // err not nil but program not panicks
}

0 comments on commit 83ecda9

Please sign in to comment.