Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
move ResetStart into interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Feb 2, 2021
1 parent 379151d commit 4de7e74
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 32 deletions.
9 changes: 8 additions & 1 deletion pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Set interface {
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

// ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed
ResetStart() bool

String() string
}

Expand Down Expand Up @@ -260,7 +263,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error {
// ResetStart resets the start part of GTID sets,
// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`.
// return `true` if reset real happen.
// NOTE: for MariaDB GTID, no this function exists because its format is `domainID-serverID-SeqNum`.
func (g *MySQLGTIDSet) ResetStart() bool {
if g == nil || g.set == nil {
return false
Expand Down Expand Up @@ -442,6 +444,11 @@ func (m *MariadbGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`.
func (m *MariadbGTIDSet) ResetStart() bool {
return false
}

func (m *MariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
22 changes: 14 additions & 8 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,9 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
}
}

func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) {
func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) {
var (
gMaria, _ = ParserGTID("", "1-2-3")
flavor = "mysql"
gNil *MySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
Expand All @@ -403,24 +404,29 @@ func (s *testGTIDSuite) TestMySQLGTIDResetStart(c *C) {
g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100")
g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100")
)

c.Assert(gMaria.ResetStart(), IsFalse)
c.Assert(gNil.ResetStart(), IsFalse)
c.Assert(gEmpty.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(gEmpty.ResetStart(), IsFalse)

c.Assert(g1.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(g1.ResetStart(), IsFalse)

c.Assert(g2.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g2.ResetStart(), IsTrue)
c.Assert(g2.Equal(g1), IsTrue)

c.Assert(g3.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g3.ResetStart(), IsTrue)
c.Assert(g3.Equal(g1), IsTrue)

c.Assert(g4.(*MySQLGTIDSet).ResetStart(), IsFalse)
c.Assert(g4.ResetStart(), IsFalse)

c.Assert(g5.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g5.ResetStart(), IsTrue)
c.Assert(g5.Equal(g4), IsTrue)

c.Assert(g6.(*MySQLGTIDSet).ResetStart(), IsTrue)
c.Assert(g6.ResetStart(), IsTrue)
c.Assert(g6.Equal(g4), IsTrue)

c.Assert(g7.ResetStart(), IsTrue)
// TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon
}
6 changes: 2 additions & 4 deletions pkg/v1dbschema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reade
defer func() {
if err == nil && gs != nil {
oldGs := gs.Clone()
if mysqlGs, ok := gs.(*gtid.MySQLGTIDSet); ok {
if mysqlGs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", mysqlGs))
}
if gs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs))
}
}
}()
Expand Down
34 changes: 15 additions & 19 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,19 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))

if result.LatestGTIDs != nil {
if mysqlGS, ok := result.LatestGTIDs.(*gtid.MySQLGTIDSet); ok {
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
oldGs1 := mysqlGS.Clone()
if mysqlGS.ResetStart() {
r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", mysqlGS))
// also need to reset start for `latestGTID`.
oldGs2 := latestGTID.Clone()
if latestGTID.(*gtid.MySQLGTIDSet).ResetStart() {
r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID))
}
gs := result.LatestGTIDs
oldGs1 := gs.Clone()
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
if gs.ResetStart() {
r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", gs))
oldGs2 := latestGTID.Clone()
if latestGTID.ResetStart() {
r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID))
}
}
}
Expand Down Expand Up @@ -1001,10 +999,8 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error)
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
oldGs := resultGs.Clone()
if mysqlGs, ok := resultGs.(*gtid.MySQLGTIDSet); ok {
if mysqlGs.ResetStart() {
r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", mysqlGs))
}
if resultGs.ResetStart() {
r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", resultGs))
}
return resultGs, nil
}

0 comments on commit 4de7e74

Please sign in to comment.