Skip to content

Commit

Permalink
relay/meta(dm): fix potential data races after saving GTID (#4455)
Browse files Browse the repository at this point in the history
close #4166
  • Loading branch information
dsdashun authored Jan 26, 2022
1 parent f51e410 commit 72c5fab
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
4 changes: 2 additions & 2 deletions dm/relay/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error {
lm.BinlogGTID = ""
} else {
lm.BinlogGTID = gset.String()
lm.gset = gset
lm.gset = gset.Clone() // need to clone and set, in order to avoid the local meta's gset and the input gset referencing the same object, causing contentions later
}

lm.dirty = true
Expand Down Expand Up @@ -328,7 +328,7 @@ func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID g
}

if newGTID != nil {
lm.gset = newGTID
lm.gset = newGTID.Clone() // need to clone and set, in order to avoid the local meta's gset and the input newGTID referencing the same object, causing contentions later
lm.BinlogGTID = newGTID.String()
} // if newGTID == nil, keep GTID not changed

Expand Down
63 changes: 63 additions & 0 deletions dm/relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package relay

import (
"fmt"
"os"
"path"
"strings"
Expand Down Expand Up @@ -234,3 +235,65 @@ func (r *testMetaSuite) TestLocalMeta(c *C) {
currentDir := lm.Dir()
c.Assert(strings.HasSuffix(currentDir, cs.uuidWithSuffix), IsTrue)
}

func (r *testMetaSuite) TestLocalMetaPotentialDataRace(c *C) {
var err error
lm := NewLocalMeta("mysql", "/FAKE_DIR")
uuidStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2"
initGSet, _ := gtid.ParserGTID("mysql", fmt.Sprintf("%s:1", uuidStr))
lm.(*LocalMeta).currentUUID = uuidStr
err = lm.Save(
mysql.Position{Name: "mysql-bin.000001", Pos: 234},
initGSet,
)
c.Assert(err, IsNil)

ch1 := make(chan error)
ch2 := make(chan error)
pendingCh := make(chan struct{})
go func() {
<-pendingCh
var err error
defer func() {
ch1 <- err
}()
_, lastGTID := lm.GTID()
var theMGSet mysql.GTIDSet
for i := 2; i < 100; i++ {
theMGSet, err = mysql.ParseGTIDSet("mysql", fmt.Sprintf("%s:1-%d", uuidStr, i*10))
if err != nil {
return
}

err = lastGTID.Set(theMGSet)
if err != nil {
return
}
err = lm.Save(
mysql.Position{Name: fmt.Sprintf("mysql-bin.%06d", i), Pos: 123},
lastGTID,
)
if err != nil {
return
}
}
}()
var gtidString string
go func() {
<-pendingCh
var err error
defer func() {
ch2 <- err
}()
for i := 0; i < 100; i++ {
_, currentGTID := lm.GTID()
gtidString = currentGTID.String()
}
}()
close(pendingCh)
ch1Err := <-ch1
ch2Err := <-ch2
c.Assert(ch1Err, IsNil)
c.Assert(ch2Err, IsNil)
c.Logf("GTID string from the go routine: %s", gtidString)
}

0 comments on commit 72c5fab

Please sign in to comment.