Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

streamer: fix duplicate event when reparse relay using GTID #1525

Merged
merged 6 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type BinlogReader struct {

tctx *tcontext.Context

usingGTID bool
prevGset, currGset mysql.GTIDSet
}

Expand Down Expand Up @@ -251,6 +252,7 @@ func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (Streamer, error) {
// StartSyncByGTID start sync by gtid
func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (Streamer, error) {
r.tctx.L().Info("begin to sync binlog", zap.Stringer("GTID Set", gset))
r.usingGTID = true

if r.running {
return nil, terror.ErrReaderAlreadyRunning.Generate()
Expand Down Expand Up @@ -400,6 +402,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
needReParse bool
)
latestPos = offset
replaceWithHeartbeat := false
r.tctx.L().Debug("start to parse relay log file", zap.String("file", relayLogFile), zap.Int64("position", latestPos), zap.String("directory", relayLogDir))

for {
Expand All @@ -408,7 +411,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
return false, 0, "", "", ctx.Err()
default:
}
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast)
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat)
firstParse = false // set to false to handle the `continue` below
if err != nil {
return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir)
Expand All @@ -422,18 +425,25 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
}

// parseFile parses single relay log file from specified offset
// TODO: move all stateful variables into a class, such as r.fileParser
func (r *BinlogReader) parseFile(
ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64,
relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) (
needSwitch, needReParse bool, latestPos int64, nextUUID string, nextBinlogName string, err error) {
ctx context.Context,
s *LocalStreamer,
relayLogFile string,
offset int64,
relayLogDir string,
firstParse bool,
currentUUID string,
possibleLast bool,
replaceWithHeartbeat bool,
) (needSwitch, needReParse bool, latestPos int64, nextUUID, nextBinlogName string, currentReplaceFlag bool, err error) {
_, suffixInt, err := utils.ParseSuffixForUUID(currentUUID)
if err != nil {
return false, false, 0, "", "", err
return false, false, 0, "", "", false, err
}

uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name
latestPos = offset // set to argument passed in
replaceWithHeartbeat := false

onEventFunc := func(e *replication.BinlogEvent) error {
r.tctx.L().Debug("read event", zap.Reflect("header", e.Header))
Expand Down Expand Up @@ -527,11 +537,11 @@ func (r *BinlogReader) parseFile(
// ref: https://github.com/mysql/mysql-server/blob/4f1d7cf5fcb11a3f84cff27e37100d7295e7d5ca/sql/rpl_binlog_sender.cc#L248
e, err2 := utils.GenFakeRotateEvent(relayLogFile, uint64(offset), r.latestServerID)
if err2 != nil {
return false, false, 0, "", "", terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset)
return false, false, 0, "", "", false, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset)
}
err2 = onEventFunc(e)
if err2 != nil {
return false, false, 0, "", "", terror.Annotatef(err2, "send event %+v", e.Header)
return false, false, 0, "", "", false, terror.Annotatef(err2, "send event %+v", e.Header)
}
r.tctx.L().Info("start parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset))
} else {
Expand All @@ -545,15 +555,15 @@ func (r *BinlogReader) parseFile(
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, 0, "", "", terror.ErrParserParseRelayLog.Delegate(err, fullPath)
return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath)
}
}
r.tctx.L().Debug("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", latestPos))

if !possibleLast {
// there are more relay log files in current sub directory, continue to re-collect them
r.tctx.L().Info("more relay log files need to parse", zap.String("directory", relayLogDir))
return false, false, latestPos, "", "", nil
return false, false, latestPos, "", "", false, nil
}

switchCh := make(chan SwitchPath, 1)
Expand Down Expand Up @@ -581,30 +591,30 @@ func (r *BinlogReader) parseFile(

select {
case <-ctx.Done():
return false, false, 0, "", "", nil
return false, false, 0, "", "", false, nil
case switchResp := <-switchCh:
// wait to ensure old file not updated
pathUpdated := utils.WaitSomething(3, watcherInterval, func() bool { return len(updatePathCh) > 0 })
if pathUpdated {
// re-parse it
return false, true, latestPos, "", "", nil
return false, true, latestPos, "", "", replaceWithHeartbeat, nil
}
// update new uuid
if err = r.updateUUIDs(); err != nil {
return false, false, 0, "", "", nil
return false, false, 0, "", "", false, nil
}
return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, nil
return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, false, nil
case updatePath := <-updatePathCh:
if strings.HasSuffix(updatePath, relayLogFile) {
// current relay log file updated, need to re-parse it
return false, true, latestPos, "", "", nil
return false, true, latestPos, "", "", replaceWithHeartbeat, nil
}
// need parse next relay log file or re-collect files
return false, false, latestPos, "", "", nil
return false, false, latestPos, "", "", false, nil
case err := <-switchErrCh:
return false, false, 0, "", "", err
return false, false, 0, "", "", false, err
case err := <-updateErrCh:
return false, false, 0, "", "", err
return false, false, 0, "", "", false, err
}
}

Expand Down
Loading