Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
streamer: fix duplicate event when reparse relay using GTID (#1525) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Mar 29, 2021
1 parent 9108ad4 commit 9947467
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 46 deletions.
46 changes: 28 additions & 18 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type BinlogReader struct {

tctx *tcontext.Context

usingGTID bool
prevGset, currGset mysql.GTIDSet
}

Expand Down Expand Up @@ -251,6 +252,7 @@ func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (Streamer, error) {
// StartSyncByGTID start sync by gtid
func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (Streamer, error) {
r.tctx.L().Info("begin to sync binlog", zap.Stringer("GTID Set", gset))
r.usingGTID = true

if r.running {
return nil, terror.ErrReaderAlreadyRunning.Generate()
Expand Down Expand Up @@ -400,6 +402,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
needReParse bool
)
latestPos = offset
replaceWithHeartbeat := false
r.tctx.L().Debug("start to parse relay log file", zap.String("file", relayLogFile), zap.Int64("position", latestPos), zap.String("directory", relayLogDir))

for {
Expand All @@ -408,7 +411,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
return false, 0, "", "", ctx.Err()
default:
}
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast)
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat)
firstParse = false // set to false to handle the `continue` below
if err != nil {
return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir)
Expand All @@ -422,18 +425,25 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
}

// parseFile parses single relay log file from specified offset
// TODO: move all stateful variables into a class, such as r.fileParser
func (r *BinlogReader) parseFile(
ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64,
relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) (
needSwitch, needReParse bool, latestPos int64, nextUUID string, nextBinlogName string, err error) {
ctx context.Context,
s *LocalStreamer,
relayLogFile string,
offset int64,
relayLogDir string,
firstParse bool,
currentUUID string,
possibleLast bool,
replaceWithHeartbeat bool,
) (needSwitch, needReParse bool, latestPos int64, nextUUID, nextBinlogName string, currentReplaceFlag bool, err error) {
_, suffixInt, err := utils.ParseSuffixForUUID(currentUUID)
if err != nil {
return false, false, 0, "", "", err
return false, false, 0, "", "", false, err
}

uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name
latestPos = offset // set to argument passed in
replaceWithHeartbeat := false

onEventFunc := func(e *replication.BinlogEvent) error {
r.tctx.L().Debug("read event", zap.Reflect("header", e.Header))
Expand Down Expand Up @@ -527,11 +537,11 @@ func (r *BinlogReader) parseFile(
// ref: https://github.com/mysql/mysql-server/blob/4f1d7cf5fcb11a3f84cff27e37100d7295e7d5ca/sql/rpl_binlog_sender.cc#L248
e, err2 := utils.GenFakeRotateEvent(relayLogFile, uint64(offset), r.latestServerID)
if err2 != nil {
return false, false, 0, "", "", terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset)
return false, false, 0, "", "", false, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset)
}
err2 = onEventFunc(e)
if err2 != nil {
return false, false, 0, "", "", terror.Annotatef(err2, "send event %+v", e.Header)
return false, false, 0, "", "", false, terror.Annotatef(err2, "send event %+v", e.Header)
}
r.tctx.L().Info("start parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset))
} else {
Expand All @@ -545,15 +555,15 @@ func (r *BinlogReader) parseFile(
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, 0, "", "", terror.ErrParserParseRelayLog.Delegate(err, fullPath)
return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
}
}
r.tctx.L().Debug("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", latestPos))

if !possibleLast {
// there are more relay log files in current sub directory, continue to re-collect them
r.tctx.L().Info("more relay log files need to parse", zap.String("directory", relayLogDir))
return false, false, latestPos, "", "", nil
return false, false, latestPos, "", "", false, nil
}

switchCh := make(chan SwitchPath, 1)
Expand Down Expand Up @@ -581,30 +591,30 @@ func (r *BinlogReader) parseFile(

select {
case <-ctx.Done():
return false, false, 0, "", "", nil
return false, false, 0, "", "", false, nil
case switchResp := <-switchCh:
// wait to ensure old file not updated
pathUpdated := utils.WaitSomething(3, watcherInterval, func() bool { return len(updatePathCh) > 0 })
if pathUpdated {
// re-parse it
return false, true, latestPos, "", "", nil
return false, true, latestPos, "", "", replaceWithHeartbeat, nil
}
// update new uuid
if err = r.updateUUIDs(); err != nil {
return false, false, 0, "", "", nil
return false, false, 0, "", "", false, nil
}
return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, nil
return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, false, nil
case updatePath := <-updatePathCh:
if strings.HasSuffix(updatePath, relayLogFile) {
// current relay log file updated, need to re-parse it
return false, true, latestPos, "", "", nil
return false, true, latestPos, "", "", replaceWithHeartbeat, nil
}
// need parse next relay log file or re-collect files
return false, false, latestPos, "", "", nil
return false, false, latestPos, "", "", false, nil
case err := <-switchErrCh:
return false, false, 0, "", "", err
return false, false, 0, "", "", false, err
case err := <-updateErrCh:
return false, false, 0, "", "", err
return false, false, 0, "", "", false, err
}
}

Expand Down
Loading

0 comments on commit 9947467

Please sign in to comment.