diff --git a/syncer/status.go b/syncer/status.go index 72c6de2e4a..acab8dfd45 100644 --- a/syncer/status.go +++ b/syncer/status.go @@ -27,8 +27,7 @@ import ( "github.com/pingcap/dm/syncer/metrics" ) -// Status implements Unit.Status -// it returns status, but does not calc status. +// Status implements Unit.Status. func (s *Syncer) Status(sourceStatus *binlog.SourceStatus) interface{} { syncerLocation := s.checkpoint.FlushedGlobalPoint() st := &pb.SyncStatus{ @@ -95,10 +94,8 @@ func (s *Syncer) printStatus(sourceStatus *binlog.SourceStatus) { return } now := time.Now() - s.lastTime.RLock() - seconds := now.Unix() - s.lastTime.t.Unix() - s.lastTime.RUnlock() - totalSeconds := now.Unix() - s.start.Unix() + seconds := now.Unix() - s.lastTime.Load().Unix() + totalSeconds := now.Unix() - s.start.Load().Unix() last := s.lastCount.Load() total := s.count.Load() @@ -106,7 +103,7 @@ func (s *Syncer) printStatus(sourceStatus *binlog.SourceStatus) { lastBinlogSize := s.lastBinlogSizeCount.Load() tps, totalTps := int64(0), int64(0) - if seconds > 0 { + if seconds > 0 && totalSeconds > 0 { tps = (total - last) / seconds totalTps = total / totalSeconds @@ -149,9 +146,7 @@ func (s *Syncer) printStatus(sourceStatus *binlog.SourceStatus) { s.lastCount.Store(total) s.lastBinlogSizeCount.Store(totalBinlogSize) - s.lastTime.Lock() - s.lastTime.t = time.Now() - s.lastTime.Unlock() + s.lastTime.Store(time.Now()) s.totalTps.Store(totalTps) s.tps.Store(tps) } diff --git a/syncer/status_test.go b/syncer/status_test.go new file mode 100644 index 0000000000..fa0ed4fa65 --- /dev/null +++ b/syncer/status_test.go @@ -0,0 +1,66 @@ +package syncer + +import ( + "sync" + + "github.com/go-mysql-org/go-mysql/mysql" + . "github.com/pingcap/check" + "go.uber.org/zap" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/binlog" + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/syncer/shardddl" +) + +var _ = Suite(&statusSuite{}) + +type statusSuite struct{} + +func (t *statusSuite) TestStatusRace(c *C) { + s := &Syncer{} + + l := log.With(zap.String("unit test", "TestStatueRace")) + s.tctx = tcontext.Background().WithLogger(l) + s.cfg = &config.SubTaskConfig{} + s.checkpoint = &mockCheckpoint{} + s.pessimist = shardddl.NewPessimist(&l, nil, "", "") + + sourceStatus := &binlog.SourceStatus{ + Location: binlog.Location{ + Position: mysql.Position{ + Name: "mysql-bin.000123", + Pos: 223, + }, + }, + Binlogs: binlog.FileSizes(nil), + } + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ret := s.Status(sourceStatus) + status := ret.(*pb.SyncStatus) + c.Assert(status.MasterBinlog, Equals, "(mysql-bin.000123, 223)") + c.Assert(status.SyncerBinlog, Equals, "(mysql-bin.000123, 123)") + }() + } + wg.Wait() +} + +type mockCheckpoint struct { + CheckPoint +} + +func (*mockCheckpoint) FlushedGlobalPoint() binlog.Location { + return binlog.Location{ + Position: mysql.Position{ + Name: "mysql-bin.000123", + Pos: 123, + }, + } +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 3527c63578..2978540a7c 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -162,11 +162,9 @@ type Syncer struct { closed atomic.Bool - start time.Time - lastTime struct { - sync.RWMutex - t time.Time - } + start atomic.Time + lastTime atomic.Time + // safeMode is used to track if we need to generate dml with safe-mode // For each binlog event, we will set the current value into eventContext because // the status of this track may change over time. @@ -1700,10 +1698,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } }() - s.start = time.Now() - s.lastTime.Lock() - s.lastTime.t = s.start - s.lastTime.Unlock() + now := time.Now() + s.start.Store(now) + s.lastTime.Store(now) tryReSync := true