Skip to content

Commit

Permalink
syncer(dm): fix startGTID is equal to endGTID (pingcap#4386)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and zhaoxinyu committed Feb 16, 2022
1 parent 777712f commit 97cc9f3
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 33 deletions.
13 changes: 11 additions & 2 deletions dm/pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,18 @@ func (l *Location) ResetSuffix() {
// SetGTID set new gtid for location
// Use this func instead of GITSet.Set to avoid change other location.
func (l *Location) SetGTID(gset gmysql.GTIDSet) error {
flavor := gmysql.MySQLFlavor
if _, ok := l.gtidSet.(*gtid.MariadbGTIDSet); ok {
var flavor string

switch gset.(type) {
case *gmysql.MysqlGTIDSet:
flavor = gmysql.MySQLFlavor
case *gmysql.MariadbGTIDSet:
flavor = gmysql.MariaDBFlavor
case nil:
l.gtidSet = nil
return nil
default:
return fmt.Errorf("unknown GTIDSet type: %T", gset)
}

newGTID := gtid.MinGTIDSet(flavor)
Expand Down
24 changes: 24 additions & 0 deletions dm/pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,30 @@ func (t *testPositionSuite) TestSetGTID(c *C) {
c.Assert(loc.gtidSet.String(), Equals, GTIDSetStr2)
c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr)
c.Assert(CompareLocation(loc, loc2, true), Equals, 1)

loc2.gtidSet = nil
err = loc2.SetGTID(mysqlSet)
c.Assert(err, IsNil)
c.Assert(loc2.gtidSet.String(), Equals, GTIDSetStr)
}

func (t *testPositionSuite) TestSetGTIDMariaDB(c *C) {
gSetStr := "1-1-1,2-2-2"
gSet, err := gtid.ParserGTID("mariadb", gSetStr)
c.Assert(err, IsNil)
gSetOrigin := gSet.Origin()

loc := Location{
Position: gmysql.Position{
Name: "mysql-bin.00002",
Pos: 2333,
},
gtidSet: nil,
Suffix: 0,
}
err = loc.SetGTID(gSetOrigin)
c.Assert(err, IsNil)
c.Assert(loc.gtidSet.String(), Equals, gSetStr)
}

func (t *testPositionSuite) TestExtractSuffix(c *C) {
Expand Down
85 changes: 70 additions & 15 deletions dm/syncer/binlog_locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"strings"
"sync"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/binlog/event"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/log"
)

Expand All @@ -33,7 +34,8 @@ type locationRecorder struct {
// | |
// curStartLocation curEndLocation
// there may be more events between curStartLocation and curEndLocation due to the limitation of binlog or
// implementation of DM, but those events should always belong to one transaction.
// implementation of DM, but in such scenario, those events should always belong to one transaction.
// When curStartLocation is equal to curEndLocation, it means current event is not a data change.
//
// curStartLocation is used when
// - display a meaningful location
Expand All @@ -44,7 +46,7 @@ type locationRecorder struct {
curStartLocation binlog.Location
curEndLocation binlog.Location

// txnEndLocation is the end location of last transaction. If current event is the last event of a txn,
// txnEndLocation is the end location of last seen transaction. If current event is the last event of a txn,
// txnEndLocation will be assigned from curEndLocation
// it is used when
// - reset binlog replication for a finer granularity
Expand All @@ -55,15 +57,20 @@ type locationRecorder struct {
// distinguish DML query event.
inDML bool

// we assign startGTID := endGTID after COMMIT, so at COMMIT we turn on the flag.
needUpdateStartGTID bool

mu sync.Mutex // guard curEndLocation because Syncer.printStatus is reading it from another goroutine.
}

func (l *locationRecorder) reset(loc binlog.Location) {
l.mu.Lock()
defer l.mu.Unlock()
l.curStartLocation = loc
l.curEndLocation = loc
l.txnEndLocation = loc
// need to clone location to avoid the modification leaking outside
clone := loc.Clone()
l.curStartLocation = clone
l.curEndLocation = clone
l.txnEndLocation = clone
}

//nolint:unused
Expand Down Expand Up @@ -102,15 +109,54 @@ func shouldUpdatePos(e *replication.BinlogEvent) bool {
return true
}

func (l *locationRecorder) setCurrentGTID(gset mysql.GTIDSet) {
err := l.curEndLocation.SetGTID(gset)
func (l *locationRecorder) updateCurStartGTID() {
gsetWrapped := l.curEndLocation.GetGTID()
if gsetWrapped == nil {
return
}
gset := gsetWrapped.Origin()
err := l.curStartLocation.SetGTID(gset)
if err != nil {
log.L().DPanic("failed to set GTID set",
zap.Any("GTID set", gset),
zap.Error(err))
}
}

func (l *locationRecorder) setCurEndGTID(e *replication.BinlogEvent) {
gtidStr, err := event.GetGTIDStr(e)
if err != nil {
log.L().DPanic("failed to get GTID from event",
zap.Any("event", e),
zap.Error(err))
return
}

gset := l.curEndLocation.GetGTID()

if gset == nil {
gset, _ = gtid.ParserGTID("", gtidStr)
_ = l.curEndLocation.SetGTID(gset.Origin())
return
}

clone := gset.Clone()
err = clone.Update(gtidStr)
if err != nil {
log.L().DPanic("failed to update GTID set",
zap.String("GTID", gtidStr),
zap.Error(err))
return
}

err = l.curEndLocation.SetGTID(clone.Origin())
if err != nil {
log.L().DPanic("failed to set GTID set",
zap.String("GTID", gtidStr),
zap.Error(err))
}
}

// update maintains the member of locationRecorder as their definitions.
// - curStartLocation is assigned to curEndLocation
// - curEndLocation is tried to be updated in-place
Expand All @@ -119,7 +165,13 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) {
l.mu.Lock()
defer l.mu.Unlock()

l.curStartLocation = l.curEndLocation
// GTID part is maintained separately
l.curStartLocation.Position = l.curEndLocation.Position

if l.needUpdateStartGTID {
l.updateCurStartGTID()
l.needUpdateStartGTID = false
}

if !shouldUpdatePos(e) {
return
Expand All @@ -138,11 +190,18 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) {
l.curEndLocation.Position.Pos = e.Header.LogPos

switch ev := e.Event.(type) {
case *replication.GTIDEvent:
l.setCurEndGTID(e)
case *replication.MariadbGTIDEvent:
l.setCurEndGTID(e)
if !ev.IsDDL() {
l.inDML = true
}
case *replication.XIDEvent:
// for transactional engines like InnoDB, COMMIT is xid event
l.setCurrentGTID(ev.GSet)
l.saveTxnEndLocation()
l.inDML = false
l.needUpdateStartGTID = true
case *replication.QueryEvent:
query := strings.TrimSpace(string(ev.Query))
switch query {
Expand All @@ -159,13 +218,9 @@ func (l *locationRecorder) update(e *replication.BinlogEvent) {
if l.inDML {
return
}
l.needUpdateStartGTID = true

l.setCurrentGTID(ev.GSet)
l.saveTxnEndLocation()
case *replication.MariadbGTIDEvent:
if !ev.IsDDL() {
l.inDML = true
}
}
}

Expand Down
31 changes: 21 additions & 10 deletions dm/syncer/binlog_locations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ type testLocationSuite struct {
currGSet gtid.Set
}

func (s *testLocationSuite) SetUpSuite(c *C) {
func (s *testLocationSuite) SetUpTest(c *C) {
s.serverID = 101
s.binlogFile = "mysql-bin.000001"
s.nextBinlogFile = "mysql-bin.000002"
s.binlogPos = 123
s.flavor = mysql.MySQLFlavor
s.prevGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14"
s.lastGTIDStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
s.currGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14"
s.currGSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-15"

var err error
s.prevGSet, err = gtid.ParserGTID(s.flavor, s.prevGSetStr)
Expand Down Expand Up @@ -179,10 +179,20 @@ func (s *testLocationSuite) checkOneTxnEvents(c *C, events []*replication.Binlog
c.Assert(r.curEndLocation, DeepEquals, expected[0])
c.Assert(r.txnEndLocation, DeepEquals, expected[0])

seenGTID := false
for i, e := range events {
r.update(e)
c.Assert(r.curStartLocation, DeepEquals, expected[i])
c.Assert(r.curEndLocation, DeepEquals, expected[i+1])

if e.Header.EventType == replication.GTID_EVENT || e.Header.EventType == replication.MARIADB_GTID_EVENT {
seenGTID = true
}
if seenGTID {
c.Assert(r.curEndLocation.Position, DeepEquals, expected[i+1].Position)
c.Assert(r.curEndLocation.GetGTID(), DeepEquals, expected[len(expected)-1].GetGTID())
} else {
c.Assert(r.curEndLocation, DeepEquals, expected[i+1])
}

if i == len(events)-1 {
switch e.Header.EventType {
Expand All @@ -197,7 +207,8 @@ func (s *testLocationSuite) checkOneTxnEvents(c *C, events []*replication.Binlog
}
}

func (s *testLocationSuite) generateExpectLocations(
// generateExpectedLocations generates binlog position part of location from given event.
func (s *testLocationSuite) generateExpectedLocations(
initLoc binlog.Location,
events []*replication.BinlogEvent,
) []binlog.Location {
Expand All @@ -224,7 +235,7 @@ func (s *testLocationSuite) TestDMLUpdateLocationsGTID(c *C) {
// we have 8 events
c.Assert(events, HasLen, 8)

expected := s.generateExpectLocations(s.loc, events)
expected := s.generateExpectedLocations(s.loc, events)

c.Assert(expected[8].SetGTID(s.currGSet.Origin()), IsNil)

Expand All @@ -246,7 +257,7 @@ func (s *testLocationSuite) TestDMLUpdateLocationsPos(c *C) {
events[7].Event.(*replication.XIDEvent).GSet = nil
events = append(events[:2], events[4:]...)

expected := s.generateExpectLocations(loc, events)
expected := s.generateExpectedLocations(loc, events)

s.checkOneTxnEvents(c, events, expected)
}
Expand All @@ -257,7 +268,7 @@ func (s *testLocationSuite) TestDDLUpdateLocationsGTID(c *C) {
// we have 5 events
c.Assert(events, HasLen, 5)

expected := s.generateExpectLocations(s.loc, events)
expected := s.generateExpectedLocations(s.loc, events)

c.Assert(expected[5].SetGTID(s.currGSet.Origin()), IsNil)

Expand All @@ -281,7 +292,7 @@ func (s *testLocationSuite) TestDDLUpdateLocationsPos(c *C) {
events = append(events[:2], events[4:]...)

// now we have 3 events, test about their 4 locations
expected := s.generateExpectLocations(loc, events)
expected := s.generateExpectedLocations(loc, events)

s.checkOneTxnEvents(c, events, expected)
}
Expand All @@ -305,7 +316,7 @@ func (s *testLocationSuite) TestDMLQueryUpdateLocationsGTID(c *C) {
// we have 7 events
c.Assert(events, HasLen, 7)

expected := s.generateExpectLocations(s.loc, events)
expected := s.generateExpectedLocations(s.loc, events)

c.Assert(expected[7].SetGTID(s.currGSet.Origin()), IsNil)

Expand Down Expand Up @@ -334,7 +345,7 @@ func (s *testLocationSuite) TestRotateEvent(c *C) {

nextLoc := s.loc
nextLoc.Position.Name = s.nextBinlogFile
expected := s.generateExpectLocations(nextLoc, events)
expected := s.generateExpectedLocations(nextLoc, events)

// reset events of first binlog file
expected[0].Position.Name = s.binlogFile
Expand Down
6 changes: 0 additions & 6 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,10 +1794,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

startTime := time.Now()
e, err = s.getEvent(tctx, currentLocation)
s.tctx.L().Debug("location refactor",
zap.Stringer("current", currentLocation),
zap.Stringer("start", startLocation),
zap.Stringer("last", lastLocation))

failpoint.Inject("SafeModeExit", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == 1 {
Expand Down Expand Up @@ -3684,8 +3680,6 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location)
e, err := s.streamerController.GetEvent(tctx)
if err == nil {
s.locations.update(e)
// TODO: observe below log in integration test
s.tctx.L().Debug("location refactor", zap.Stringer("locations", s.locations))
}
return e, err
}
Expand Down

0 comments on commit 97cc9f3

Please sign in to comment.