Skip to content

Commit

Permalink
syncer: fix the gtid purged error when switching pos replication to g…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 9, 2021
1 parent 72c08d3 commit 497f57f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
68 changes: 64 additions & 4 deletions pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"strconv"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"

"github.com/DATA-DOG/go-sqlmock"
"github.com/coreos/go-semver/semver"
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"

"github.com/pingcap/dm/pkg/gtid"
)

var _ = Suite(&testDBSuite{})
Expand Down Expand Up @@ -364,3 +365,62 @@ func (t *testDBSuite) TestTiDBVersion(c *C) {
}
}
}

func getGSetFromString(c *C, s string) gtid.Set {
gSet, err := gtid.ParserGTID("mysql", s)
c.Assert(err, IsNil)
return gSet
}

func (t *testDBSuite) TestAddGSetWithPurged(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mariaGTID, err := gtid.ParserGTID("mariadb", "1-2-100")
c.Assert(err, IsNil)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := db.Conn(ctx)
c.Assert(err, IsNil)

testCases := []struct {
originGSet gtid.Set
purgedSet gtid.Set
expectedSet gtid.Set
err error
}{
{
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:6-14"),
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-5"),
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14"),
nil,
}, {

getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:2-6"),
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:1"),
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-6"),
nil,
}, {

getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-6"),
getGSetFromString(c, "53bfca22-690d-11e7-8a62-18ded7a37b78:1-495"),
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-6,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495"),
nil,
}, {
getGSetFromString(c, "3ccc475b-2343-11e7-be21-6c0b84d59f30:6-14"),
mariaGTID,
nil,
errors.New("invalid GTID format, must UUID:interval[:interval]"),
},
}

for _, tc := range testCases {
mock.ExpectQuery("select @@GLOBAL.gtid_purged").WillReturnRows(
sqlmock.NewRows([]string{"@@GLOBAL.gtid_purged"}).AddRow(tc.purgedSet.String()))
originSet := tc.originGSet.Clone()
newSet, err := AddGSetWithPurged(ctx, originSet, conn)
c.Assert(errors.ErrorEqual(err, tc.err), IsTrue)
c.Assert(newSet, DeepEquals, tc.expectedSet)
// make sure origin gSet hasn't changed
c.Assert(originSet, DeepEquals, tc.originGSet)
}
}
10 changes: 10 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3618,6 +3618,16 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
s.tctx.L().Warn("fail to get gtids for global location", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
dbConn, err := s.fromDB.BaseDB.GetBaseConn(tctx.Context())
if err != nil {
s.tctx.L().Warn("fail to build connection", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
gs, err = utils.AddGSetWithPurged(tctx.Context(), gs, dbConn.DBConn)
if err != nil {
s.tctx.L().Warn("fail to merge purged gtidSet", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
err = location.SetGTID(gs.Origin())
if err != nil {
s.tctx.L().Warn("fail to set gtid for global location", zap.Stringer("pos", location),
Expand Down

0 comments on commit 497f57f

Please sign in to comment.