Skip to content

Commit

Permalink
*: support recover relay log file with a subset of metadata (pingcap#530
Browse files Browse the repository at this point in the history
)
  • Loading branch information
csuzhangxc authored Mar 12, 2020
1 parent bd98e80 commit 9802a12
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 88 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ ErrBinlogWriterGetFileStat,[code=11088:class=functional:scope=internal:level=hig
ErrBinlogWriterWriteDataLen,[code=11089:class=functional:scope=internal:level=high],"data length %d"
ErrBinlogWriterFileNotOpened,[code=11090:class=functional:scope=internal:level=high],"file %s not opened"
ErrBinlogWriterFileSync,[code=11091:class=functional:scope=internal:level=high],"sync file"
ErrBinlogPrevGTIDEvNotValid,[code=11092:class=functional:scope=internal:level=high],"PreviousGTIDsEvent should be a GenericEvent in go-mysql, but got %T"
ErrBinlogPrevGTIDEvNotValid,[code=11092:class=functional:scope=internal:level=high],"the event should be a PreviousGTIDsEvent in go-mysql, but got %T"
ErrBinlogDecodeMySQLGTIDSet,[code=11093:class=functional:scope=internal:level=high],"decode from % X"
ErrBinlogNeedMariaDBGTIDSet,[code=11094:class=functional:scope=internal:level=high],"the event should be a MariadbGTIDListEvent, but got %T"
ErrBinlogParseMariaDBGTIDSet,[code=11095:class=functional:scope=internal:level=high],"parse MariaDB GTID set"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/shopspring/decimal v0.0.0-20191125035519-b054a8dfd10d // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed // indirect
github.com/siddontang/go-mysql v0.0.0-20191009015310-f66c8b344478
github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047
github.com/soheilhy/cmux v0.1.4
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cobra v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM=
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
github.com/siddontang/go-mysql v0.0.0-20191009015310-f66c8b344478 h1:pfx6pRovM/9/GGA3bAnJu3vlIqLuWwqEo56WXPrN6o4=
github.com/siddontang/go-mysql v0.0.0-20191009015310-f66c8b344478/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU=
github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047 h1:boyJ8EgQN/aC3grvx8QUoJrptt7RvneezSJSCbW25a4=
github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl
Flags: defaultHeaderFlags,
}
latestPos = uint32(len(replication.BinLogFileHeader))
prevGTIDsEv *replication.BinlogEvent // for MySQL, this will be a GenericEvent
prevGTIDsEv *replication.BinlogEvent
)

formatDescEv, err := GenFormatDescriptionEvent(header, latestPos)
Expand Down
23 changes: 4 additions & 19 deletions pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package event

import (
"bytes"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -60,26 +59,12 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
var count = 0
onEventFunc := func(e *replication.BinlogEvent) error {
count++
switch count {
case 1: // FormatDescriptionEvent
c.Assert(e.Header, DeepEquals, events[0].Header)
c.Assert(e.Event, DeepEquals, events[0].Event)
c.Assert(e.RawData, DeepEquals, events[0].RawData)
case 2: // PreviousGTIDsEvent
switch flavor {
case gmysql.MySQLFlavor:
c.Assert(e.Header.EventType, Equals, replication.PREVIOUS_GTIDS_EVENT)
c.Assert(bytes.Contains(data, e.RawData), IsTrue)
case gmysql.MariaDBFlavor:
c.Assert(e.Header, DeepEquals, events[1].Header)
c.Assert(e.Event, DeepEquals, events[1].Event)
c.Assert(e.RawData, DeepEquals, events[1].RawData)
default:
c.Fatalf("invalid flavor %s", flavor)
}
default:
if count > 2 {
c.Fatalf("too many binlog events got, current is %+v", e.Header)
}
c.Assert(e.Header, DeepEquals, events[count-1].Header)
c.Assert(e.Event, DeepEquals, events[count-1].Event)
c.Assert(e.RawData, DeepEquals, events[count-1].RawData)
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa
}

// GenPreviousGTIDsEvent generates a PreviousGTIDsEvent.
// go-mysql has no PreviousGTIDsEvent struct defined, so return a GenericEvent instead.
// MySQL has no internal doc for PREVIOUS_GTIDS_EVENT.
// we ref:
// a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56
Expand All @@ -215,7 +214,7 @@ func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gS
payload := origin.Encode()

buf := new(bytes.Buffer)
event := &replication.GenericEvent{} // no PreviousGTIDsEvent struct defined, so use a GenericEvent instead.
event := &replication.PreviousGTIDsEvent{}
ev, err := assembleEvent(buf, event, false, *header, replication.PREVIOUS_GTIDS_EVENT, latestPos, nil, payload)
return ev, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (t *testEventSuite) TestGenPreviousGTIDsEvent(c *C) {
str = "9f61c5f9-1eef-11e9-b6cf-0242ac140003:1-5"
)

// go-mysql has no PreviousGTIDsEvent struct defined, so we try to parse a binlog file.
// always needing a FormatDescriptionEvent in the binlog file.
formatDescEv, err := GenFormatDescriptionEvent(header, latestPos)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -202,7 +201,8 @@ func (t *testEventSuite) TestGenPreviousGTIDsEvent(c *C) {
c.Assert(e.Event, DeepEquals, formatDescEv.Event)
c.Assert(e.RawData, DeepEquals, formatDescEv.RawData)
case 2: // PreviousGTIDsEvent
c.Assert(e.Header.EventType, Equals, replication.PREVIOUS_GTIDS_EVENT)
c.Assert(e.Header, DeepEquals, previousGTIDsEv.Header)
c.Assert(e.Event, DeepEquals, previousGTIDsEv.Event)
c.Assert(e.RawData, DeepEquals, previousGTIDsEv.RawData)
default:
c.Fatalf("too many binlog events got, current is %+v", e.Header)
Expand Down Expand Up @@ -319,7 +319,8 @@ func (t *testEventSuite) TestGenGTIDEvent(c *C) {
c.Assert(e.Event, DeepEquals, formatDescEv.Event)
c.Assert(e.RawData, DeepEquals, formatDescEv.RawData)
case 2: // PreviousGTIDsEvent
c.Assert(e.Header.EventType, Equals, replication.PREVIOUS_GTIDS_EVENT)
c.Assert(e.Header, DeepEquals, previousGTIDsEv.Header)
c.Assert(e.Event, DeepEquals, previousGTIDsEv.Event)
c.Assert(e.RawData, DeepEquals, previousGTIDsEv.RawData)
case 3: // GTIDEvent
c.Assert(e.Header.EventType, Equals, replication.GTID_EVENT)
Expand Down
4 changes: 1 addition & 3 deletions pkg/binlog/event/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32,
var count = 0
onEventFunc := func(e *replication.BinlogEvent) error {
c.Assert(e.Header.EventType, Equals, allEventTypes[count])
if count != 1 { // allEvents[1] is nil for PreviousGTIDsEvent
c.Assert(e.RawData, DeepEquals, allEvents[count].RawData)
}
c.Assert(e.RawData, DeepEquals, allEvents[count].RawData)
count++
return nil
}
Expand Down
27 changes: 4 additions & 23 deletions pkg/binlog/event/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,15 @@ import (

// GTIDsFromPreviousGTIDsEvent get GTID set from a PreviousGTIDsEvent.
func GTIDsFromPreviousGTIDsEvent(e *replication.BinlogEvent) (gtid.Set, error) {
var payload []byte
var gSetStr string
switch ev := e.Event.(type) {
case *replication.GenericEvent:
payload = ev.Data
case *replication.PreviousGTIDsEvent:
gSetStr = ev.GTIDSets
default:
return nil, terror.ErrBinlogPrevGTIDEvNotValid.Generate(e.Event)
}

if e.Header.EventType != replication.PREVIOUS_GTIDS_EVENT {
return nil, terror.ErrBinlogEventTypeNotValid.Generatef("invalid event type %d, expect %d", e.Header.EventType, replication.PREVIOUS_GTIDS_EVENT)
}

set, err := gmysql.DecodeMysqlGTIDSet(payload)
if err != nil {
return nil, terror.ErrBinlogDecodeMySQLGTIDSet.Delegate(err, payload)
}

// always MySQL for PreviousGTIDsEvent
gSet, err := gtid.ParserGTID(gmysql.MySQLFlavor, "")
if err != nil {
return nil, terror.ErrBinlogEmptyGTID.Delegate(err)
}
err = gSet.Set(set)
if err != nil {
return nil, terror.Annotatef(err, "replace GTID set with set %v", set)
}

return gSet, nil
return gtid.ParserGTID(gmysql.MySQLFlavor, gSetStr)
}

// GTIDsFromMariaDBGTIDListEvent get GTID set from a MariaDBGTIDListEvent.
Expand Down
10 changes: 2 additions & 8 deletions pkg/binlog/event/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/siddontang/go-mysql/replication"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
)

var _ = Suite(&testHelperSuite{})
Expand All @@ -41,14 +42,7 @@ func (t *testHelperSuite) TestGTIDsFromPreviousGTIDsEvent(c *C) {
queryEv, err := GenQueryEvent(header, latestPos, 0, 0, 0, nil, []byte("schema"), []byte("BEGIN"))
c.Assert(err, IsNil)
gSet, err := GTIDsFromPreviousGTIDsEvent(queryEv)
c.Assert(err, ErrorMatches, ".*should be a GenericEvent in go-mysql.*")
c.Assert(gSet, IsNil)

// invalid binlog type, USER_VAR_EVENT
userVarEv, err := GenDummyEvent(header, latestPos, MinUserVarEventLen)
c.Assert(err, IsNil)
gSet, err = GTIDsFromPreviousGTIDsEvent(userVarEv)
c.Assert(err, ErrorMatches, ".*invalid event type.*")
c.Assert(terror.ErrBinlogPrevGTIDEvNotValid.Equal(err), IsTrue)
c.Assert(gSet, IsNil)

// valid MySQL GTIDs
Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ var (
ErrBinlogWriterWriteDataLen = New(codeBinlogWriterWriteDataLen, ClassFunctional, ScopeInternal, LevelHigh, "data length %d")
ErrBinlogWriterFileNotOpened = New(codeBinlogWriterFileNotOpened, ClassFunctional, ScopeInternal, LevelHigh, "file %s not opened")
ErrBinlogWriterFileSync = New(codeBinlogWriterFileSync, ClassFunctional, ScopeInternal, LevelHigh, "sync file")
ErrBinlogPrevGTIDEvNotValid = New(codeBinlogPrevGTIDEvNotValid, ClassFunctional, ScopeInternal, LevelHigh, "PreviousGTIDsEvent should be a GenericEvent in go-mysql, but got %T")
ErrBinlogPrevGTIDEvNotValid = New(codeBinlogPrevGTIDEvNotValid, ClassFunctional, ScopeInternal, LevelHigh, "the event should be a PreviousGTIDsEvent in go-mysql, but got %T")
ErrBinlogDecodeMySQLGTIDSet = New(codeBinlogDecodeMySQLGTIDSet, ClassFunctional, ScopeInternal, LevelHigh, "decode from % X")
ErrBinlogNeedMariaDBGTIDSet = New(codeBinlogNeedMariaDBGTIDSet, ClassFunctional, ScopeInternal, LevelHigh, "the event should be a MariadbGTIDListEvent, but got %T")
ErrBinlogParseMariaDBGTIDSet = New(codeBinlogParseMariaDBGTIDSet, ClassFunctional, ScopeInternal, LevelHigh, "parse MariaDB GTID set")
Expand Down
6 changes: 5 additions & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,11 @@ func (r *Relay) tryRecoverLatestFile(parser2 *parser.Parser) error {
if result.Recovered {
r.tctx.L().Warn("relay log file recovered",
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))
if err = latestGTID.Truncate(result.LatestGTIDs); err != nil {
if !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID) {
r.tctx.L().Warn("some GTIDs are missing in the meta data, this is usually due to the process was interrupted while writing the meta data. force to update GTIDs",
log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))
latestGTID = result.LatestGTIDs.Clone()
} else if err = latestGTID.Truncate(result.LatestGTIDs); err != nil {
return err
}
err = r.meta.Save(result.LatestPos, latestGTID)
Expand Down
82 changes: 71 additions & 11 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay/reader"
"github.com/pingcap/dm/relay/retry"
Expand All @@ -54,6 +54,10 @@ func TestSuite(t *testing.T) {
type testRelaySuite struct {
}

func (t *testRelaySuite) SetUpSuite(c *C) {
log.InitLogger(&log.Config{})
}

func getDBConfigForTest() config.DBConfig {
host := os.Getenv("MYSQL_HOST")
if host == "" {
Expand Down Expand Up @@ -198,16 +202,6 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
c.Assert(err, IsNil)
f.Close()

// GTID sets in meta data does not contain the GTID sets in relay log, invalid
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(r.tryRecoverLatestFile(parser2)), IsTrue)

// write some invalid data into the relay log file again
f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// write a greater GTID sets in meta
greaterGITDSet, err := gtid.ParserGTID(relayCfg.Flavor, greaterGITDSetStr)
c.Assert(err, IsNil)
Expand All @@ -231,6 +225,72 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
c.Assert(latestGTIDs.Contain(g.LatestGTID), IsTrue)
}

func (t *testRelaySuite) TestTryRecoverMeta(c *C) {
var (
uuid = "24ecd093-8cec-11e9-aa0d-0242ac170002"
previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495"
genGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}

parser2 = parser.New()
relayCfg = &Config{
RelayDir: c.MkDir(),
Flavor: gmysql.MySQLFlavor,
}
r = NewRelay(relayCfg).(*Relay)
)

genGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, genGTIDSetStr)
c.Assert(err, IsNil)

c.Assert(r.meta.AddDir(uuid, &startPos, nil), IsNil)
c.Assert(r.meta.Load(), IsNil)

// use a generator to generate some binlog events
previousGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, previousGTIDSetStr)
c.Assert(err, IsNil)
latestGTID1, err := gtid.ParserGTID(relayCfg.Flavor, latestGTIDStr1)
c.Assert(err, IsNil)
latestGTID2, err := gtid.ParserGTID(relayCfg.Flavor, latestGTIDStr2)
c.Assert(err, IsNil)
g, _, data := genBinlogEventsWithGTIDs(c, relayCfg.Flavor, previousGTIDSet, latestGTID1, latestGTID2)

// write events into relay log file
err = ioutil.WriteFile(filepath.Join(r.meta.Dir(), filename), data, 0600)
c.Assert(err, IsNil)
// write some invalid data into the relay log file to trigger a recover.
f, err := os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// recover with empty GTIDs.
c.Assert(r.tryRecoverLatestFile(parser2), IsNil)
_, latestPos := r.meta.Pos()
c.Assert(latestPos, DeepEquals, gmysql.Position{Name: filename, Pos: g.LatestPos})
_, latestGTIDs := r.meta.GTID()
c.Assert(latestGTIDs.Equal(genGTIDSet), IsTrue)

// write some invalid data into the relay log file again.
f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// recover with the subset of GTIDs (previous GTID set).
c.Assert(r.meta.Save(startPos, previousGTIDSet), IsNil)
c.Assert(r.tryRecoverLatestFile(parser2), IsNil)
_, latestPos = r.meta.Pos()
c.Assert(latestPos, DeepEquals, gmysql.Position{Name: filename, Pos: g.LatestPos})
_, latestGTIDs = r.meta.GTID()
c.Assert(latestGTIDs.Equal(genGTIDSet), IsTrue)
}

// genBinlogEventsWithGTIDs generates some binlog events used by testFileUtilSuite and testFileWriterSuite.
// now, its generated events including 3 DDL and 10 DML.
func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, latestGTID2 gtid.Set) (*event.Generator, []*replication.BinlogEvent, []byte) {
Expand Down
23 changes: 11 additions & 12 deletions relay/writer/file_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ func getTxnPosGTIDs(filename string, p *parser.Parser) (int64, gtid.Set, error)
// learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745
GTID := ev.GTID
nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
case *replication.PreviousGTIDsEvent:
// if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L5161
var gSet gtid.Set
gSet, err = gtid.ParserGTID(gmysql.MySQLFlavor, ev.GTIDSets)
if err != nil {
return 0, nil, err
}
latestGSet = gSet.Origin()
flavor = gmysql.MySQLFlavor
case *replication.MariadbGTIDListEvent:
// a MariadbGTIDListEvent logged in every binlog to record the current replication state if GTID enabled
// ref: https://mariadb.com/kb/en/library/gtid_list_event/
Expand All @@ -236,18 +247,6 @@ func getTxnPosGTIDs(filename string, p *parser.Parser) (int64, gtid.Set, error)
}
latestGSet = gSet.Origin()
flavor = gmysql.MariaDBFlavor
case *replication.GenericEvent:
if e.Header.EventType == replication.PREVIOUS_GTIDS_EVENT {
// if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L5161
gSet, err2 := event.GTIDsFromPreviousGTIDsEvent(e)
if err2 != nil {
return 0, nil, terror.Annotatef(err2, "get GTID set from PreviousGTIDsEvent %+v", e.Header)
}
latestGSet = gSet.Origin()
flavor = gmysql.MySQLFlavor
}
}
}

Expand Down

0 comments on commit 9802a12

Please sign in to comment.