Skip to content

Commit

Permalink
feat: fast mode impl
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou committed Mar 10, 2022
1 parent 19a1fcf commit 24b52e6
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 167 deletions.
2 changes: 2 additions & 0 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ func (st *SubTask) StartValidator(expect pb.Stage) {
}
st.Lock()
defer st.Unlock()
// refetch the validation config

if st.cfg.ValidatorCfg.Mode != config.ValidationFast && st.cfg.ValidatorCfg.Mode != config.ValidationFull {
return
}
Expand Down
6 changes: 4 additions & 2 deletions dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ type DataValidator struct {

// such as table without primary key
unsupportedTable map[string]string
waitSyncerTimer *time.Timer
mode string
}

func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer) *DataValidator {
Expand All @@ -136,9 +138,9 @@ func NewContinuousDataValidator(cfg *config.SubTaskConfig, syncerObj *Syncer) *D
v.validateInterval = validationInterval

v.unsupportedTable = make(map[string]string)
v.waitSyncerTimer = utils.NewStoppedTimer()

return v
}

func (v *DataValidator) initialize() error {
v.ctx, v.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -415,7 +417,7 @@ func (v *DataValidator) Stage() pb.Stage {
func (v *DataValidator) startValidateWorkers() {
v.wg.Add(v.workerCnt)
for i := 0; i < v.workerCnt; i++ {
worker := newValidateWorker(v, i)
worker := newValidateWorker(v, i, v.mode)
v.workers[i] = worker
go func() {
v.wg.Done()
Expand Down
22 changes: 13 additions & 9 deletions dm/syncer/validate_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ type validateWorker struct {
pendingRowCount atomic.Int64
batchSize int
sync.Mutex

mode string
}

func newValidateWorker(v *DataValidator, id int) *validateWorker {
workerLog := v.L.WithFields(zap.Int("id", id))
func newValidateWorker(v *DataValidator, id int, mode string) *validateWorker {
return &validateWorker{
cfg: v.cfg.ValidatorCfg,
ctx: v.ctx,
Expand All @@ -82,6 +83,7 @@ func newValidateWorker(v *DataValidator, id int) *validateWorker {
rowChangeCh: make(chan *rowChange, workerChannelSize),
pendingChangesMap: make(map[string]*tableChange),
batchSize: maxBatchSize,
mode: mode,
}
}

Expand Down Expand Up @@ -257,13 +259,15 @@ func (vw *validateWorker) validateInsertAndUpdateRows(rows []*rowChange, cond *C
failedRows[key] = &validateFailedRow{tp: rowNotExist}
continue
}

eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
return nil, err
}
if !eq {
failedRows[key] = &validateFailedRow{tp: rowDifferent, dstData: targetRow}
if vw.mode == config.ValidationFull {
// only compare the whole row in full mode
eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
return nil, err
}
if !eq {
failedRows[key] = &validateFailedRow{tp: rowDifferent, dstData: targetRow}
}
}
}
return failedRows, nil
Expand Down
Loading

0 comments on commit 24b52e6

Please sign in to comment.