diff --git a/dm/relay/meta.go b/dm/relay/meta.go index c48f2c5ae2b..38e50110940 100644 --- a/dm/relay/meta.go +++ b/dm/relay/meta.go @@ -212,7 +212,7 @@ func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error { lm.BinlogGTID = "" } else { lm.BinlogGTID = gset.String() - lm.gset = gset + lm.gset = gset.Clone() // need to clone and set, in order to avoid the local meta's gset and the input gset referencing the same object, causing contentions later } lm.dirty = true @@ -328,7 +328,7 @@ func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID g } if newGTID != nil { - lm.gset = newGTID + lm.gset = newGTID.Clone() // need to clone and set, in order to avoid the local meta's gset and the input newGTID referencing the same object, causing contentions later lm.BinlogGTID = newGTID.String() } // if newGTID == nil, keep GTID not changed diff --git a/dm/relay/meta_test.go b/dm/relay/meta_test.go index 56c25f69875..558e812b67d 100644 --- a/dm/relay/meta_test.go +++ b/dm/relay/meta_test.go @@ -14,6 +14,7 @@ package relay import ( + "fmt" "os" "path" "strings" @@ -234,3 +235,65 @@ func (r *testMetaSuite) TestLocalMeta(c *C) { currentDir := lm.Dir() c.Assert(strings.HasSuffix(currentDir, cs.uuidWithSuffix), IsTrue) } + +func (r *testMetaSuite) TestLocalMetaPotentialDataRace(c *C) { + var err error + lm := NewLocalMeta("mysql", "/FAKE_DIR") + uuidStr := "85ab69d1-b21f-11e6-9c5e-64006a8978d2" + initGSet, _ := gtid.ParserGTID("mysql", fmt.Sprintf("%s:1", uuidStr)) + lm.(*LocalMeta).currentUUID = uuidStr + err = lm.Save( + mysql.Position{Name: "mysql-bin.000001", Pos: 234}, + initGSet, + ) + c.Assert(err, IsNil) + + ch1 := make(chan error) + ch2 := make(chan error) + pendingCh := make(chan struct{}) + go func() { + <-pendingCh + var err error + defer func() { + ch1 <- err + }() + _, lastGTID := lm.GTID() + var theMGSet mysql.GTIDSet + for i := 2; i < 100; i++ { + theMGSet, err = mysql.ParseGTIDSet("mysql", fmt.Sprintf("%s:1-%d", uuidStr, i*10)) + if err != nil { + return + } + + err = lastGTID.Set(theMGSet) + if err != nil { + return + } + err = lm.Save( + mysql.Position{Name: fmt.Sprintf("mysql-bin.%06d", i), Pos: 123}, + lastGTID, + ) + if err != nil { + return + } + } + }() + var gtidString string + go func() { + <-pendingCh + var err error + defer func() { + ch2 <- err + }() + for i := 0; i < 100; i++ { + _, currentGTID := lm.GTID() + gtidString = currentGTID.String() + } + }() + close(pendingCh) + ch1Err := <-ch1 + ch2Err := <-ch2 + c.Assert(ch1Err, IsNil) + c.Assert(ch2Err, IsNil) + c.Logf("GTID string from the go routine: %s", gtidString) +}