Skip to content

Commit

Permalink
syncer/checkpoint(dm): update the meaning of IsOlderThanTablePoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jan 21, 2022
1 parent 09076bc commit e0eeac0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
18 changes: 8 additions & 10 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ type CheckPoint interface {
DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
IsOlderThanTablePoint(table *filter.Table, point binlog.Location, useLE bool) bool
IsOlderThanTablePoint(table *filter.Table, point binlog.Location, isDDL bool) bool

// SaveGlobalPoint saves the global binlog stream's checkpoint
// corresponding to Meta.Save
Expand Down Expand Up @@ -575,14 +575,12 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche
}

// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
// For GTID replication, go-mysql will only update GTID set in a XID event after the rows event, for example, the binlog events are:
// - Query event e1, location is gset1
// - Rows event e2, location is gset1
// - XID event, location is gset2
// We should note that e1 is not older than e2
// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position.
// if useLE is true, we use less than or equal.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location, useLE bool) bool {
// This function is used to skip old binlog events. Table checkpoint is saved after dispatching a binlog event.
// - For GTID based and position based replication, DML handling is different. When using position based, each event has
// unique position so we have confident to skip event which is <= table checkpoint. When using GTID based, there may
// be more than one event with same GTID, so we can only skip event which is < table checkpoint.
// - DDL will not have unique position or GTID, so we can always skip events <= table checkpoint.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location binlog.Location, isDDL bool) bool {
cp.RLock()
defer cp.RUnlock()
sourceSchema, sourceTable := table.Schema, table.Name
Expand All @@ -597,7 +595,7 @@ func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location
oldLocation := point.MySQLLocation()
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation))

if useLE {
if isDDL || !cp.cfg.EnableGTID {
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) <= 0
}
return binlog.CompareLocation(location, oldLocation, cp.cfg.EnableGTID) < 0
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *Syncer) handleQueryEventOptimistic(qec *queryEventContext) error {

qec.shardingDDLInfo = trackInfos[0]
job := newDDLJob(qec)
err = s.handleJobFunc(job)
_, err = s.handleJobFunc(job)
if err != nil {
return err
}
Expand Down
46 changes: 26 additions & 20 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type Syncer struct {
isQueryEvent bool
}

handleJobFunc func(*job) error
handleJobFunc func(*job) (bool, error)
flushSeq int64

// `lower_case_table_names` setting of upstream db
Expand Down Expand Up @@ -991,7 +991,7 @@ func (s *Syncer) checkShouldFlush() error {

// TODO: move to syncer/job.go
// handleJob will do many actions based on job type.
func (s *Syncer) handleJob(job *job) (err error) {
func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) {
skipCheckFlush := false
defer func() {
if !skipCheckFlush && err == nil {
Expand All @@ -1012,23 +1012,24 @@ func (s *Syncer) handleJob(job *job) (err error) {

if waitXIDStatus(s.waitXIDJob.Load()) == waitComplete && job.tp != flush {
s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job))
return nil
return
}

switch job.tp {
case xid:
s.waitXIDJob.CAS(int64(waiting), int64(waitComplete))
s.saveGlobalPoint(job.location)
s.isTransactionEnd = true
return nil
return
case skip:
s.updateReplicationJobTS(job, skipJobIdx)
return nil
return
}

// 2. send the job to queue

s.addJob(job)
added2Queue = true

// 3. after job is sent to queue

Expand All @@ -1038,14 +1039,14 @@ func (s *Syncer) handleJob(job *job) (err error) {
// caller
s.isTransactionEnd = false
skipCheckFlush = true
return nil
return
case ddl:
s.jobWg.Wait()

// skip rest logic when downstream error
if s.execError.Load() != nil {
// nolint:nilerr
return nil
return
}

s.updateReplicationJobTS(job, ddlJobIdx)
Expand All @@ -1058,7 +1059,7 @@ func (s *Syncer) handleJob(job *job) (err error) {
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(3, val.(int), "before save checkpoint")
if err != nil {
failpoint.Return(err)
failpoint.Return()
}
})
// save global checkpoint for DDL
Expand All @@ -1078,19 +1079,22 @@ func (s *Syncer) handleJob(job *job) (err error) {
failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) {
err = handleFlushCheckpointStage(4, val.(int), "before flush checkpoint")
if err != nil {
failpoint.Return(err)
failpoint.Return()
}
})
skipCheckFlush = true
return s.flushCheckPoints()
err = s.flushCheckPoints()
return
case flush:
s.jobWg.Wait()
skipCheckFlush = true
return s.flushCheckPoints()
err = s.flushCheckPoints()
return
case asyncFlush:
skipCheckFlush = true
}
return err
// nolint:nakedret
return
}

func (s *Syncer) saveGlobalPoint(globalLocation binlog.Location) {
Expand Down Expand Up @@ -2053,7 +2057,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}

job := newXIDJob(currentLocation, startLocation, currentLocation)
err2 = s.handleJobFunc(job)
_, err2 = s.handleJobFunc(job)
case *replication.GenericEvent:
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// flush checkpoint even if there are no real binlog events
Expand Down Expand Up @@ -2325,9 +2329,9 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
startTime := time.Now()
for i := range dmls {
job := newDMLJob(jobType, sourceTable, targetTable, dmls[i], &ec)
err = s.handleJobFunc(job)
if err != nil {
return err
added2Queue, err2 := s.handleJobFunc(job)
if err2 != nil || !added2Queue {
return err2
}
}
metrics.DispatchBinlogDurationHistogram.WithLabelValues(jobType.String(), s.cfg.Name, s.cfg.SourceID).Observe(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -2692,7 +2696,7 @@ func (s *Syncer) handleQueryEventNoSharding(qec *queryEventContext) error {
})

job := newDDLJob(qec)
err := s.handleJobFunc(job)
_, err := s.handleJobFunc(job)
if err != nil {
return err
}
Expand Down Expand Up @@ -2914,7 +2918,7 @@ func (s *Syncer) handleQueryEventPessimistic(qec *queryEventContext) error {
})

job := newDDLJob(qec)
err = s.handleJobFunc(job)
_, err = s.handleJobFunc(job)
if err != nil {
return err
}
Expand Down Expand Up @@ -3299,7 +3303,8 @@ func (s *Syncer) closeDBs() {
// make newJob's sql argument empty to distinguish normal sql and skips sql.
func (s *Syncer) recordSkipSQLsLocation(ec *eventContext) error {
job := newSkipJob(ec)
return s.handleJobFunc(job)
_, err := s.handleJobFunc(job)
return err
}

// flushJobs add a flush job and wait for all jobs finished.
Expand All @@ -3308,7 +3313,8 @@ func (s *Syncer) flushJobs() error {
flushJobSeq := s.getFlushSeq()
s.tctx.L().Info("flush all jobs", zap.Stringer("global checkpoint", s.checkpoint), zap.Int64("flush job seq", flushJobSeq))
job := newFlushJob(s.cfg.WorkerCount, flushJobSeq)
return s.handleJobFunc(job)
_, err := s.handleJobFunc(job)
return err
}

func (s *Syncer) reSyncBinlog(tctx tcontext.Context, location binlog.Location) error {
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@ func (s *Syncer) mockFinishJob(jobs []*expectJob) {
}
}

func (s *Syncer) addJobToMemory(job *job) error {
func (s *Syncer) addJobToMemory(job *job) (bool, error) {
log.L().Info("add job to memory", zap.Stringer("job", job))

switch job.tp {
Expand Down Expand Up @@ -1512,7 +1512,7 @@ func (s *Syncer) addJobToMemory(job *job) error {
}
}

return nil
return true, nil
}

func (s *Syncer) setupMockCheckpoint(c *C, checkPointDBConn *sql.Conn, checkPointMock sqlmock.Sqlmock) {
Expand Down

0 comments on commit e0eeac0

Please sign in to comment.