Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix panic when concurrent call Status #2115

Merged
merged 5 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"github.com/pingcap/dm/syncer/metrics"
)

// Status implements Unit.Status
// it returns status, but does not calc status.
// Status implements Unit.Status.
func (s *Syncer) Status(sourceStatus *binlog.SourceStatus) interface{} {
syncerLocation := s.checkpoint.FlushedGlobalPoint()
st := &pb.SyncStatus{
Expand Down Expand Up @@ -95,18 +94,16 @@ func (s *Syncer) printStatus(sourceStatus *binlog.SourceStatus) {
return
}
now := time.Now()
s.lastTime.RLock()
seconds := now.Unix() - s.lastTime.t.Unix()
s.lastTime.RUnlock()
totalSeconds := now.Unix() - s.start.Unix()
seconds := now.Unix() - s.lastTime.Load().Unix()
totalSeconds := now.Unix() - s.start.Load().Unix()
last := s.lastCount.Load()
total := s.count.Load()

totalBinlogSize := s.binlogSizeCount.Load()
lastBinlogSize := s.lastBinlogSizeCount.Load()

tps, totalTps := int64(0), int64(0)
if seconds > 0 {
if seconds > 0 && totalSeconds > 0 {
tps = (total - last) / seconds
totalTps = total / totalSeconds

Expand Down Expand Up @@ -149,9 +146,7 @@ func (s *Syncer) printStatus(sourceStatus *binlog.SourceStatus) {

s.lastCount.Store(total)
s.lastBinlogSizeCount.Store(totalBinlogSize)
s.lastTime.Lock()
s.lastTime.t = time.Now()
s.lastTime.Unlock()
s.lastTime.Store(time.Now())
s.totalTps.Store(totalTps)
s.tps.Store(tps)
}
66 changes: 66 additions & 0 deletions syncer/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package syncer

import (
"sync"

"github.com/go-mysql-org/go-mysql/mysql"
. "github.com/pingcap/check"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/syncer/shardddl"
)

var _ = Suite(&statusSuite{})

type statusSuite struct{}

func (t *statusSuite) TestStatusRace(c *C) {
s := &Syncer{}

l := log.With(zap.String("unit test", "TestStatueRace"))
s.tctx = tcontext.Background().WithLogger(l)
s.cfg = &config.SubTaskConfig{}
s.checkpoint = &mockCheckpoint{}
s.pessimist = shardddl.NewPessimist(&l, nil, "", "")

sourceStatus := &binlog.SourceStatus{
Location: binlog.Location{
Position: mysql.Position{
Name: "mysql-bin.000123",
Pos: 223,
},
},
Binlogs: binlog.FileSizes(nil),
}

wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ret := s.Status(sourceStatus)
status := ret.(*pb.SyncStatus)
c.Assert(status.MasterBinlog, Equals, "(mysql-bin.000123, 223)")
c.Assert(status.SyncerBinlog, Equals, "(mysql-bin.000123, 123)")
}()
}
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
wg.Wait()
}

type mockCheckpoint struct {
CheckPoint
}

func (*mockCheckpoint) FlushedGlobalPoint() binlog.Location {
return binlog.Location{
Position: mysql.Position{
Name: "mysql-bin.000123",
Pos: 123,
},
}
}
15 changes: 6 additions & 9 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,9 @@ type Syncer struct {

closed atomic.Bool

start time.Time
lastTime struct {
sync.RWMutex
t time.Time
}
start atomic.Time
lastTime atomic.Time

// safeMode is used to track if we need to generate dml with safe-mode
// For each binlog event, we will set the current value into eventContext because
// the status of this track may change over time.
Expand Down Expand Up @@ -1700,10 +1698,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}()

s.start = time.Now()
s.lastTime.Lock()
s.lastTime.t = s.start
s.lastTime.Unlock()
now := time.Now()
s.start.Store(now)
s.lastTime.Store(now)

tryReSync := true

Expand Down