Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou committed Mar 14, 2022
1 parent 0028a5a commit ceca0b3
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 9 deletions.
1 change: 0 additions & 1 deletion dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func (st *SubTask) StartValidator(expect pb.Stage) {
}
st.Lock()
defer st.Unlock()
// refetch the validation config

if st.cfg.ValidatorCfg.Mode != config.ValidationFast && st.cfg.ValidatorCfg.Mode != config.ValidationFull {
return
Expand Down
3 changes: 1 addition & 2 deletions dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ type DataValidator struct {

// such as table without primary key
unsupportedTable map[string]string
mode string
}

func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer) *DataValidator {
Expand Down Expand Up @@ -416,7 +415,7 @@ func (v *DataValidator) Stage() pb.Stage {
func (v *DataValidator) startValidateWorkers() {
v.wg.Add(v.workerCnt)
for i := 0; i < v.workerCnt; i++ {
worker := newValidateWorker(v, i, v.mode)
worker := newValidateWorker(v, i)
v.workers[i] = worker
go func() {
v.wg.Done()
Expand Down
7 changes: 2 additions & 5 deletions dm/syncer/validate_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ type validateWorker struct {
pendingRowCount atomic.Int64
batchSize int
sync.Mutex

mode string
}

func newValidateWorker(v *DataValidator, id int, mode string) *validateWorker {
func newValidateWorker(v *DataValidator, id int) *validateWorker {
workerLog := v.L.WithFields(zap.Int("id", id))
return &validateWorker{
cfg: v.cfg.ValidatorCfg,
Expand All @@ -84,7 +82,6 @@ func newValidateWorker(v *DataValidator, id int, mode string) *validateWorker {
rowChangeCh: make(chan *rowChange, workerChannelSize),
pendingChangesMap: make(map[string]*tableChange),
batchSize: maxBatchSize,
mode: mode,
}
}

Expand Down Expand Up @@ -260,7 +257,7 @@ func (vw *validateWorker) validateInsertAndUpdateRows(rows []*rowChange, cond *C
failedRows[key] = &validateFailedRow{tp: rowNotExist}
continue
}
if vw.mode == config.ValidationFull {
if vw.cfg.Mode == config.ValidationFull {
// only compare the whole row in full mode
eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/validate_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestValidatorWorkerRunInsertUpdate(t *testing.T) {
"create table tbl3(a varchar(100) primary key, b varchar(100))")

cfg := genSubtaskConfig(t)
cfg.ValidatorCfg.Mode = mode
_, mock, err := conn.InitMockDBFull()
mock.MatchExpectationsInOrder(false)
require.NoError(t, err)
Expand All @@ -60,7 +61,7 @@ func TestValidatorWorkerRunInsertUpdate(t *testing.T) {
defer validator.cancel()

// insert & update same table, both row are validated failed
worker := newValidateWorker(validator, 0, mode)
worker := newValidateWorker(validator, 0)
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
Expand Down

0 comments on commit ceca0b3

Please sign in to comment.