From 9947467ce657452c2ef9ed3bd672f167583a5df1 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 29 Mar 2021 22:53:24 +0800 Subject: [PATCH] streamer: fix duplicate event when reparse relay using GTID (#1525) (#1532) --- pkg/streamer/reader.go | 46 +++++++---- pkg/streamer/reader_test.go | 160 +++++++++++++++++++++++++++++------- 2 files changed, 160 insertions(+), 46 deletions(-) diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 2d27d4659f..3fce85cc1d 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -75,6 +75,7 @@ type BinlogReader struct { tctx *tcontext.Context + usingGTID bool prevGset, currGset mysql.GTIDSet } @@ -251,6 +252,7 @@ func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (Streamer, error) { // StartSyncByGTID start sync by gtid func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (Streamer, error) { r.tctx.L().Info("begin to sync binlog", zap.Stringer("GTID Set", gset)) + r.usingGTID = true if r.running { return nil, terror.ErrReaderAlreadyRunning.Generate() @@ -400,6 +402,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer needReParse bool ) latestPos = offset + replaceWithHeartbeat := false r.tctx.L().Debug("start to parse relay log file", zap.String("file", relayLogFile), zap.Int64("position", latestPos), zap.String("directory", relayLogDir)) for { @@ -408,7 +411,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer return false, 0, "", "", ctx.Err() default: } - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat) firstParse = false // set to false to handle the `continue` below if err != nil { return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir) @@ -422,18 +425,25 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer } // parseFile parses single relay log file from specified offset +// TODO: move all stateful variables into a class, such as r.fileParser func (r *BinlogReader) parseFile( - ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64, - relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) ( - needSwitch, needReParse bool, latestPos int64, nextUUID string, nextBinlogName string, err error) { + ctx context.Context, + s *LocalStreamer, + relayLogFile string, + offset int64, + relayLogDir string, + firstParse bool, + currentUUID string, + possibleLast bool, + replaceWithHeartbeat bool, +) (needSwitch, needReParse bool, latestPos int64, nextUUID, nextBinlogName string, currentReplaceFlag bool, err error) { _, suffixInt, err := utils.ParseSuffixForUUID(currentUUID) if err != nil { - return false, false, 0, "", "", err + return false, false, 0, "", "", false, err } uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name latestPos = offset // set to argument passed in - replaceWithHeartbeat := false onEventFunc := func(e *replication.BinlogEvent) error { r.tctx.L().Debug("read event", zap.Reflect("header", e.Header)) @@ -527,11 +537,11 @@ func (r *BinlogReader) parseFile( // ref: https://github.com/mysql/mysql-server/blob/4f1d7cf5fcb11a3f84cff27e37100d7295e7d5ca/sql/rpl_binlog_sender.cc#L248 e, err2 := utils.GenFakeRotateEvent(relayLogFile, uint64(offset), r.latestServerID) if err2 != nil { - return false, false, 0, "", "", terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset) + return false, false, 0, "", "", false, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset) } err2 = onEventFunc(e) if err2 != nil { - return false, false, 0, "", "", terror.Annotatef(err2, "send event %+v", e.Header) + return false, false, 0, "", "", false, terror.Annotatef(err2, "send event %+v", e.Header) } r.tctx.L().Info("start parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset)) } else { @@ -545,7 +555,7 @@ func (r *BinlogReader) parseFile( r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err)) } else { r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err)) - return false, false, 0, "", "", terror.ErrParserParseRelayLog.Delegate(err, fullPath) + return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath) } } r.tctx.L().Debug("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", latestPos)) @@ -553,7 +563,7 @@ func (r *BinlogReader) parseFile( if !possibleLast { // there are more relay log files in current sub directory, continue to re-collect them r.tctx.L().Info("more relay log files need to parse", zap.String("directory", relayLogDir)) - return false, false, latestPos, "", "", nil + return false, false, latestPos, "", "", false, nil } switchCh := make(chan SwitchPath, 1) @@ -581,30 +591,30 @@ func (r *BinlogReader) parseFile( select { case <-ctx.Done(): - return false, false, 0, "", "", nil + return false, false, 0, "", "", false, nil case switchResp := <-switchCh: // wait to ensure old file not updated pathUpdated := utils.WaitSomething(3, watcherInterval, func() bool { return len(updatePathCh) > 0 }) if pathUpdated { // re-parse it - return false, true, latestPos, "", "", nil + return false, true, latestPos, "", "", replaceWithHeartbeat, nil } // update new uuid if err = r.updateUUIDs(); err != nil { - return false, false, 0, "", "", nil + return false, false, 0, "", "", false, nil } - return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, nil + return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, false, nil case updatePath := <-updatePathCh: if strings.HasSuffix(updatePath, relayLogFile) { // current relay log file updated, need to re-parse it - return false, true, latestPos, "", "", nil + return false, true, latestPos, "", "", replaceWithHeartbeat, nil } // need parse next relay log file or re-collect files - return false, false, latestPos, "", "", nil + return false, false, latestPos, "", "", false, nil case err := <-switchErrCh: - return false, false, 0, "", "", err + return false, false, 0, "", "", false, err case err := <-updateErrCh: - return false, false, 0, "", "", err + return false, false, 0, "", "", false, err } } diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 8b4ffd417d..5000b60cf9 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -89,14 +89,15 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { relayDir := filepath.Join(baseDir, currentUUID) cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} r := NewBinlogReader(log.L(), cfg) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, ErrorMatches, ".*invalid-current-uuid.*") c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) // change to valid currentUUID currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" @@ -106,14 +107,15 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { r = NewBinlogReader(log.L(), cfg) // relay log file not exists, failed - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the path specified).*") c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) // empty relay log file, failed, got EOF err = os.MkdirAll(relayDir, 0700) @@ -121,14 +123,15 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0600) c.Assert(err, IsNil) defer f.Close() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(errors.Cause(err), Equals, io.EOF) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) // write some events to binlog file _, err = f.Write(replication.BinLogFileHeader) @@ -141,14 +144,15 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { t.purgeStreamer(c, s) // base test with only one valid binlog file - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) // try get events back, firstParse should have fake RotateEvent var fakeRotateEventCount int @@ -173,14 +177,15 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { // try get events back, not firstParse should have no fake RotateEvent firstParse = false - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) fakeRotateEventCount = 0 i = 0 for { @@ -208,26 +213,28 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { c.Assert(err, IsNil) // latest is still the end_log_pos of the last event, not the next relay file log file's position - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(rotateEv.Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) t.purgeStreamer(c, s) // parse from a non-zero offset offset = int64(rotateEv.Header.LogPos - rotateEv.Header.EventSize) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(rotateEv.Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) // should only got a RotateEvent and a FormatDescriptionEven i = 0 @@ -284,14 +291,15 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { // no valid update for relay sub dir, timeout, no error ctx1, cancel1 := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( + ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) t.purgeStreamer(c, s) // current relay log file updated, need to re-parse it @@ -306,14 +314,15 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { }() ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsTrue) c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) wg.Wait() t.purgeStreamer(c, s) @@ -327,14 +336,15 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) { }() ctx3, cancel3 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel3() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(extraEvents[0].Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) wg.Wait() t.purgeStreamer(c, s) } @@ -377,14 +387,15 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { t.writeUUIDs(c, baseDir, r.uuids) ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( + ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, ErrorMatches, ".*not valid.*") c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) t.purgeStreamer(c, s) // next sub dir exits, need to switch @@ -398,14 +409,15 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { // has relay log file in next sub directory, need to switch ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsTrue) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, switchedUUID) c.Assert(nextBinlogName, Equals, nextFilename) + c.Assert(replaceWithHeartbeat, Equals, false) t.purgeStreamer(c, s) // NOTE: if we want to test the returned `needReParse` of `needSwitchSubDir`, @@ -438,14 +450,15 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { // file has no data, meet io.EOF error (when reading file header) and ignore it. ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( + ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsFalse) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) _, err = f.Write(replication.BinLogFileHeader) c.Assert(err, IsNil) @@ -459,14 +472,15 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { // meet `err EOF` error (when parsing binlog event) ignored ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast) + needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( + ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) c.Assert(needReParse, IsTrue) c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") + c.Assert(replaceWithHeartbeat, Equals, false) } func (t *testReaderSuite) TestUpdateUUIDs(c *C) { @@ -1026,6 +1040,96 @@ func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { c.Assert(r.currGset.String(), Equals, "0-1-6") } +func (t *testReaderSuite) TestReParseUsingGTID(c *C) { + var ( + baseDir = c.MkDir() + cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: mysql.MySQLFlavor} + r = NewBinlogReader(log.L(), cfg) + uuid = "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001" + gtidStr = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" + file = "mysql.000001" + latestPos uint32 + ) + + startGTID, err := gtid.ParserGTID(mysql.MySQLFlavor, "") + c.Assert(err, IsNil) + lastGTID, err := gtid.ParserGTID(mysql.MySQLFlavor, gtidStr) + c.Assert(err, IsNil) + + // prepare a minimal relay log file + c.Assert(ioutil.WriteFile(r.indexPath, []byte(uuid), 0600), IsNil) + + uuidDir := path.Join(baseDir, uuid) + c.Assert(os.MkdirAll(uuidDir, 0700), IsNil) + f, err := os.OpenFile(path.Join(uuidDir, file), os.O_CREATE|os.O_WRONLY, 0600) + c.Assert(err, IsNil) + _, err = f.Write(replication.BinLogFileHeader) + c.Assert(err, IsNil) + + meta := Meta{BinLogName: file, BinLogPos: latestPos, BinlogGTID: startGTID.String()} + metaFile, err := os.Create(path.Join(uuidDir, utils.MetaFilename)) + c.Assert(err, IsNil) + c.Assert(toml.NewEncoder(metaFile).Encode(meta), IsNil) + c.Assert(metaFile.Close(), IsNil) + + // prepare some regular events, + // FORMAT_DESC + PREVIOUS_GTIDS, some events generated from a DDL, some events generated from a DML + genType := []replication.EventType{ + replication.PREVIOUS_GTIDS_EVENT, + replication.QUERY_EVENT, + replication.XID_EVENT} + events, _, _, latestGTIDSet := t.genEvents(c, genType, 4, lastGTID, startGTID) + c.Assert(events, HasLen, 1+1+2+5) + + // write FORMAT_DESC + PREVIOUS_GTIDS + _, err = f.Write(events[0].RawData) + c.Assert(err, IsNil) + _, err = f.Write(events[1].RawData) + c.Assert(err, IsNil) + + // we use latestGTIDSet to start sync, which means we already received all binlog events, so expect no DML/DDL + s, err := r.StartSyncByGTID(latestGTIDSet.Origin()) + c.Assert(err, IsNil) + var wg sync.WaitGroup + wg.Add(1) + + go func() { + expected := map[uint32]replication.EventType{} + for _, e := range events { + switch e.Event.(type) { + // keeps same + case *replication.FormatDescriptionEvent, *replication.PreviousGTIDsEvent: + expected[e.Header.LogPos] = e.Header.EventType + default: + expected[e.Header.LogPos] = replication.HEARTBEAT_EVENT + } + } + // fake rotate + expected[0] = replication.ROTATE_EVENT + lastLogPos := events[len(events)-1].Header.LogPos + + ctx, cancel := context.WithCancel(context.Background()) + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + c.Assert(ev.Header.EventType, Equals, expected[ev.Header.LogPos]) + if ev.Header.LogPos == lastLogPos { + break + } + } + cancel() + wg.Done() + }() + + for i := 2; i < len(events); i++ { + // hope a second is enough to trigger needReParse + time.Sleep(time.Second) + _, err = f.Write(events[i].RawData) + c.Assert(err, IsNil) + } + wg.Wait() +} + func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32, latestGTID gtid.Set) ([]*replication.BinlogEvent, uint32, gtid.Set) { var ( header = &replication.EventHeader{