Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: fix GTID recover after relay log recovered (#335) #339

Merged
merged 7 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ ErrTracingDataChecksum,[code=11101:class=functional:scope=internal:level=high],"
ErrTracingGetTSO,[code=11102:class=functional:scope=internal:level=high],"get tso"
ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium],"backoff argument %s value %v not valid"
ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed"
ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s"
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s"
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s"
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var (
// user var name used in dummy USER_VAR_EVENT
dummyUserVarName = []byte("!dummyvar")
// dummy (commented) query in a QueryEvent
dummyQuery = []byte("# dummy query, often used to fill a hole in a binlog file")
dummyQuery = []byte("# dummy query generated by DM, often used to fill a hole in a binlog file")
)

// GenEventHeader generates a EventHeader's raw data according to a passed-in EventHeader struct.
Expand Down
62 changes: 62 additions & 0 deletions pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type Set interface {
Equal(other Set) bool
Contain(other Set) bool

// Truncate truncates the current GTID sets until the `end` in-place.
// NOTE: the original GTID sets should contain the end GTID sets, otherwise it's invalid.
// like truncating `00c04543-f584-11e9-a765-0242ac120002:1-100` with `00c04543-f584-11e9-a765-0242ac120002:40-60`
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

String() string
}

Expand Down Expand Up @@ -173,6 +179,36 @@ func (g *mySQLGTIDSet) Contain(other Set) bool {
return g.set.Contain(other.Origin())
}

func (g *mySQLGTIDSet) Truncate(end Set) error {
if end == nil {
return nil // do nothing
}
if !g.Contain(end) {
return terror.ErrGTIDTruncateInvalid.Generate(g, end)
}
endGs := end.(*mySQLGTIDSet) // already verify the type is `*mySQLGTIDSet` in `Contain`.
if endGs == nil {
return nil // do nothing
}

for sid, setG := range g.set.Sets {
setE, ok := endGs.set.Sets[sid]
if !ok {
continue // no need to truncate for this SID
}
for i, interG := range setG.Intervals {
for _, interE := range setE.Intervals {
if interG.Start <= interE.Start && interG.Stop >= interE.Stop {
interG.Stop = interE.Stop // truncate the stop
}
}
setG.Intervals[i] = interG // overwrite the value (because it's not a pointer)
}
}

return nil
}

func (g *mySQLGTIDSet) String() string {
if g.set == nil {
return ""
Expand Down Expand Up @@ -288,6 +324,32 @@ func (m *mariadbGTIDSet) Contain(other Set) bool {
return m.set.Contain(other.Origin())
}

func (m *mariadbGTIDSet) Truncate(end Set) error {
if end == nil {
return nil // do nothing
}
if !m.Contain(end) {
return terror.ErrGTIDTruncateInvalid.Generate(m, end)
}
endGs := end.(*mariadbGTIDSet) // already verify the type is `*mariadbGTIDSet` in `Contain`.
if endGs == nil {
return nil // do nothing
}

for did, mGTID := range m.set.Sets {
eGTID, ok := endGs.set.Sets[did]
if !ok {
continue // no need to truncate for this domain ID
}
if mGTID.SequenceNumber > eGTID.SequenceNumber {
mGTID.SequenceNumber = eGTID.SequenceNumber // truncate the seqNO
mGTID.ServerID = eGTID.ServerID // also update server-id to match the seqNO
}
}

return nil
}

func (m *mariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
189 changes: 189 additions & 0 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"testing"

. "github.com/pingcap/check"

"github.com/pingcap/dm/pkg/terror"
)

var _ = Suite(&testGTIDSuite{})
Expand Down Expand Up @@ -178,3 +180,190 @@ func (s *testGTIDSuite) TestMairaGTIDContain(c *C) {
c.Assert(g1.Contain(g2), IsFalse)
c.Assert(g2.Contain(g1), IsFalse)
}

func (s *testGTIDSuite) TestMySQLGTIDTruncate(c *C) {
var (
flavor = "mysql"
g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100")
g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100")
gNil *mySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
gMariaDBNil *mariadbGTIDSet
)
// truncate to nil or empty GTID sets has no effect
c.Assert(g1.Truncate(nil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gNil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gEmpty), IsNil)
c.Assert(g1, DeepEquals, g2)

// nil truncate to nil has no effect
c.Assert(gNil.Truncate(nil), IsNil)
c.Assert(gNil.Truncate(gNil), IsNil)

// nil truncate to not nil report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(gNil.Truncate(g1)), IsTrue)

// truncate with invalid MySQL GTID sets report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(g1.Truncate(gMariaDBNil)), IsTrue)

cases := []struct {
before string
end string
after string
hasError bool
}{
// before not contain end
{
before: "00c04543-f584-11e9-a765-0242ac120002:100",
end: "00c04543-f584-11e9-a765-0242ac120002:99",
hasError: true,
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60",
end: "00c04543-f584-11e9-a765-0242ac120002:50-70",
hasError: true,
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60",
end: "00c04543-f584-11e9-a765-0242ac120002:30-50",
hasError: true,
},
// truncate take effect
{
before: "00c04543-f584-11e9-a765-0242ac120002:100",
end: "00c04543-f584-11e9-a765-0242ac120002:100",
after: "00c04543-f584-11e9-a765-0242ac120002:100",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60:70:80-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55:85-95",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55:70:80-95",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60:70:80-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55:70:85-95",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55:70:80-95",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-80",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-80",
},
}

for _, cs := range cases {
bg, err := ParserGTID(flavor, cs.before)
c.Assert(err, IsNil)
eg, err := ParserGTID(flavor, cs.end)
c.Assert(err, IsNil)
ag, err := ParserGTID(flavor, cs.after)
c.Assert(err, IsNil)
err = bg.Truncate(eg)
if cs.hasError {
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(err), IsTrue)
} else {
c.Assert(bg, DeepEquals, ag)
}
}
}

func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
var (
flavor = "mariadb"
g1, _ = ParserGTID(flavor, "1-2-3")
g2, _ = ParserGTID(flavor, "1-2-3")
gNil *mariadbGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
gMySQLNil *mySQLGTIDSet
)
// truncate to nil or empty GTID sets has no effect
c.Assert(g1.Truncate(nil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gNil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gEmpty), IsNil)
c.Assert(g1, DeepEquals, g2)

// nil truncate to nil has no effect
c.Assert(gNil.Truncate(nil), IsNil)
c.Assert(gNil.Truncate(gNil), IsNil)

// nil truncate to not nil report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(gNil.Truncate(g1)), IsTrue)

// truncate with invalid MariaDB GTID sets report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(g1.Truncate(gMySQLNil)), IsTrue)

cases := []struct {
before string
end string
after string
hasError bool
}{
// before not contain end
{
before: "1-2-3",
end: "2-2-3",
hasError: true,
},
{
before: "1-2-3",
end: "1-2-4",
hasError: true,
},

// truncate take effect
{
before: "1-2-3",
end: "1-2-3",
after: "1-2-3",
},
{
before: "1-2-10",
end: "1-2-8",
after: "1-2-8",
},
{
before: "1-2-10",
end: "1-3-8",
after: "1-3-8",
},
{
before: "1-2-10,2-2-10",
end: "1-2-8",
after: "1-2-8,2-2-10",
},
{
before: "1-2-10,2-2-10",
end: "1-3-8,2-2-6",
after: "1-3-8,2-2-6",
},
}

for _, cs := range cases {
bg, err := ParserGTID(flavor, cs.before)
c.Assert(err, IsNil)
eg, err := ParserGTID(flavor, cs.end)
c.Assert(err, IsNil)
ag, err := ParserGTID(flavor, cs.after)
c.Assert(err, IsNil)
err = bg.Truncate(eg)
if cs.hasError {
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(err), IsTrue)
} else {
c.Assert(bg, DeepEquals, ag)
}
}
}
5 changes: 5 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ const (
codeBackoffArgsNotValid

codeInitLoggerFail

// pkg/gtid
codeGTIDTruncateInvalid
)

// Config related error code list
Expand Down Expand Up @@ -589,6 +592,8 @@ var (
ErrBackoffArgsNotValid = New(codeBackoffArgsNotValid, ClassFunctional, ScopeInternal, LevelMedium, "backoff argument %s value %v not valid")
// pkg
ErrInitLoggerFail = New(codeInitLoggerFail, ClassFunctional, ScopeInternal, LevelMedium, "init logger failed")
// pkg/gtid
ErrGTIDTruncateInvalid = New(codeGTIDTruncateInvalid, ClassFunctional, ScopeInternal, LevelHigh, "truncate GTID sets %v to %v not valid")

// Config related error
ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s")
Expand Down
10 changes: 9 additions & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,10 @@ func (r *Relay) tryRecoverLatestFile(parser2 *parser.Parser) error {
if result.Recovered {
r.tctx.L().Warn("relay log file recovered",
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))
err = r.meta.Save(result.LatestPos, result.LatestGTIDs)
if err = latestGTID.Truncate(result.LatestGTIDs); err != nil {
return err
}
err = r.meta.Save(result.LatestPos, latestGTID)
if err != nil {
return terror.Annotatef(err, "save position %s, GTID sets %v after recovered", result.LatestPos, result.LatestGTIDs)
}
Expand Down Expand Up @@ -387,6 +390,11 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
cfg := r.cfg.From
r.tctx.L().Error("the requested binlog files have purged in the master server or the master server have switched, currently DM do no support to handle this error",
zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), log.ShortError(err))
// log the status for debug
pos, gs, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor)
if err2 == nil {
r.tctx.L().Info("current master status", zap.Stringer("position", pos), log.WrapStringerField("GTID sets", gs))
}
}
binlogReadErrorCounter.Inc()
}
Expand Down
24 changes: 22 additions & 2 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay/reader"
"github.com/pingcap/dm/relay/retry"
Expand Down Expand Up @@ -140,6 +141,8 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495"
genGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}

Expand Down Expand Up @@ -182,16 +185,33 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
// write some invalid data into the relay log file
f, err := os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
defer f.Close()
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// GTID sets in meta data does not contain the GTID sets in relay log, invalid
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(r.tryRecoverLatestFile(parser2)), IsTrue)

// write some invalid data into the relay log file again
f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// write a greater GTID sets in meta
greaterGITDSet, err := gtid.ParserGTID(relayCfg.Flavor, greaterGITDSetStr)
c.Assert(err, IsNil)
c.Assert(r.meta.Save(startPos, greaterGITDSet), IsNil)

// invalid data truncated, meta updated
c.Assert(r.tryRecoverLatestFile(parser2), IsNil)
_, latestPos := r.meta.Pos()
c.Assert(latestPos, DeepEquals, gmysql.Position{Name: filename, Pos: g.LatestPos})
_, latestGTIDs := r.meta.GTID()
c.Assert(latestGTIDs.Contain(g.LatestGTID), IsTrue) // verifyMetadata is not enough
genGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, genGTIDSetStr)
c.Assert(err, IsNil)
c.Assert(latestGTIDs.Equal(genGTIDSet), IsTrue) // verifyMetadata is not enough

// no relay log file need to recover
c.Assert(r.meta.Save(minCheckpoint, latestGTIDs), IsNil)
Expand Down