diff --git a/cdc/model/sink.go b/cdc/model/sink.go index adeb04e5149..6ead16d3240 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -263,8 +263,9 @@ type RedoRowChangedEvent struct { // RedoDDLEvent represents DDL event used in redo log persistent type RedoDDLEvent struct { - DDL *DDLEvent `msg:"ddl"` - Type byte `msg:"type"` + DDL *DDLEvent `msg:"ddl"` + Type byte `msg:"type"` + TableName TableName `msg:"table-name"` } // ToRedoLog converts row changed event to redo log diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index f56a1d6bc56..618d1ba72dd 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -667,6 +667,12 @@ func (z *RedoDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Type") return } + case "table-name": + err = z.TableName.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "TableName") + return + } default: err = dc.Skip() if err != nil { @@ -680,9 +686,9 @@ func (z *RedoDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *RedoDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 + // map header, size 3 // write "ddl" - err = en.Append(0x82, 0xa3, 0x64, 0x64, 0x6c) + err = en.Append(0x83, 0xa3, 0x64, 0x64, 0x6c) if err != nil { return } @@ -734,15 +740,25 @@ func (z *RedoDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Type") return } + // write "table-name" + err = en.Append(0xaa, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + if err != nil { + return + } + err = z.TableName.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "TableName") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *RedoDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 3 // string "ddl" - o = append(o, 0x82, 0xa3, 0x64, 0x64, 0x6c) + o = append(o, 0x83, 0xa3, 0x64, 0x64, 0x6c) if z.DDL == nil { o = msgp.AppendNil(o) } else { @@ -760,6 +776,13 @@ func (z *RedoDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) o = msgp.AppendByte(o, z.Type) + // string "table-name" + o = append(o, 0xaa, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + o, err = z.TableName.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "TableName") + return + } return } @@ -839,6 +862,12 @@ func (z *RedoDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Type") return } + case "table-name": + bts, err = z.TableName.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "TableName") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -859,7 +888,7 @@ func (z *RedoDDLEvent) Msgsize() (s int) { } else { s += 1 + 9 + msgp.Uint64Size + 10 + msgp.Uint64Size + 6 + msgp.StringPrefixSize + len(z.DDL.Query) } - s += 5 + msgp.ByteSize + s += 5 + msgp.ByteSize + 11 + z.TableName.Msgsize() return } diff --git a/cdc/redo/reader/blackhole_reader.go b/cdc/redo/reader/blackhole_reader.go index 370c137f863..224ab8ec4ab 100644 --- a/cdc/redo/reader/blackhole_reader.go +++ b/cdc/redo/reader/blackhole_reader.go @@ -27,18 +27,18 @@ func newBlackHoleReader() *BlackHoleReader { return &BlackHoleReader{} } -// ResetReader implements LogReader.ReadLog -func (br *BlackHoleReader) ResetReader(ctx context.Context, startTs, endTs uint64) error { +// Run implements LogReader.Run +func (br *BlackHoleReader) Run(ctx context.Context) error { return nil } -// ReadNextLog implements LogReader.ReadNextLog -func (br *BlackHoleReader) ReadNextLog(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoRowChangedEvent, error) { +// ReadNextRow implements LogReader.ReadNextRow +func (br *BlackHoleReader) ReadNextRow(ctx context.Context) (*model.RowChangedEvent, error) { return nil, nil } // ReadNextDDL implements LogReader.ReadNextDDL -func (br *BlackHoleReader) ReadNextDDL(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoDDLEvent, error) { +func (br *BlackHoleReader) ReadNextDDL(ctx context.Context) (*model.DDLEvent, error) { return nil, nil } diff --git a/cdc/redo/reader/file.go b/cdc/redo/reader/file.go index 6ed7e7b36d3..c0b343629e1 100644 --- a/cdc/redo/reader/file.go +++ b/cdc/redo/reader/file.go @@ -25,6 +25,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -53,7 +54,7 @@ const ( type fileReader interface { io.Closer // Read return the log from log file - Read(log *model.RedoLog) error + Read() (*model.RedoLog, error) } type readerConfig struct { @@ -84,6 +85,7 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) { if cfg.workerNums == 0 { cfg.workerNums = defaultWorkerNum } + start := time.Now() if cfg.useExternalStorage { extStorage, err := redo.InitExternalStorage(ctx, cfg.uri) @@ -113,6 +115,9 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) { }) } + log.Info("succeed to download and sort redo logs", + zap.String("type", cfg.fileType), + zap.Duration("duration", time.Since(start))) return readers, nil } @@ -249,8 +254,7 @@ func readFile(file *os.File) (logHeap, error) { h := logHeap{} for { - rl := &model.RedoLog{} - err := r.Read(rl) + rl, err := r.Read() if err != nil { if err != io.EOF { return nil, err @@ -383,16 +387,16 @@ func shouldOpen(startTs uint64, name, fixedType string) (bool, error) { // Read implement Read interface. // TODO: more general reader pair with writer in writer pkg -func (r *reader) Read(redoLog *model.RedoLog) error { +func (r *reader) Read() (*model.RedoLog, error) { r.mu.Lock() defer r.mu.Unlock() lenField, err := readInt64(r.br) if err != nil { if err == io.EOF { - return err + return nil, err } - return cerror.WrapError(cerror.ErrRedoFileOp, err) + return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } recBytes, padBytes := decodeFrameSize(lenField) @@ -403,23 +407,24 @@ func (r *reader) Read(redoLog *model.RedoLog) error { log.Warn("read redo log have unexpected io error", zap.String("fileName", r.fileName), zap.Error(err)) - return io.EOF + return nil, io.EOF } - return cerror.WrapError(cerror.ErrRedoFileOp, err) + return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) } + redoLog := new(model.RedoLog) _, err = redoLog.UnmarshalMsg(data[:recBytes]) if err != nil { if r.isTornEntry(data) { // just return io.EOF, since if torn write it is the last redoLog entry - return io.EOF + return nil, io.EOF } - return cerror.WrapError(cerror.ErrUnmarshalFailed, err) + return nil, cerror.WrapError(cerror.ErrUnmarshalFailed, err) } // point last valid offset to the end of redoLog r.lastValidOff += frameSizeBytes + recBytes + padBytes - return nil + return redoLog, nil } func readInt64(r io.Reader) (int64, error) { diff --git a/cdc/redo/reader/file_test.go b/cdc/redo/reader/file_test.go index 602c684c74d..614117c888a 100644 --- a/cdc/redo/reader/file_test.go +++ b/cdc/redo/reader/file_test.go @@ -86,8 +86,7 @@ func TestReaderRead(t *testing.T) { require.Nil(t, err) require.Equal(t, 1, len(r)) defer r[0].Close() //nolint:errcheck - log = &model.RedoLog{} - err = r[0].Read(log) + log, err = r[0].Read() require.Nil(t, err) require.EqualValues(t, 1123, log.RedoRow.Row.CommitTs) time.Sleep(1001 * time.Millisecond) @@ -236,8 +235,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) { closer: r, } for { - rl := &model.RedoLog{} - err := r.Read(rl) + rl, err := r.Read() if err == io.EOF { break } diff --git a/cdc/redo/reader/reader.go b/cdc/redo/reader/reader.go index f33ae48d189..63074174929 100644 --- a/cdc/redo/reader/reader.go +++ b/cdc/redo/reader/reader.go @@ -22,30 +22,30 @@ import ( "path/filepath" "sync" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/common" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/sink/mysql" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + emitBatch = mysql.DefaultMaxTxnRow + defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch ) // RedoLogReader is a reader abstraction for redo log storage layer type RedoLogReader interface { - io.Closer - - // ResetReader setup the reader boundary - ResetReader(ctx context.Context, startTs, endTs uint64) error - - // ReadNextLog reads up to `maxNumberOfMessages` messages from current cursor. - // The returned redo logs sorted by commit-ts - ReadNextLog(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoRowChangedEvent, error) - - // ReadNextDDL reads `maxNumberOfDDLs` ddl events from redo logs from current cursor - ReadNextDDL(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoDDLEvent, error) - + // Run read and decode redo logs in background. + Run(ctx context.Context) error + // ReadNextRow read one row event from redo logs. + ReadNextRow(ctx context.Context) (*model.RowChangedEvent, error) + // ReadNextDDL read one ddl event from redo logs. + ReadNextDDL(ctx context.Context) (*model.DDLEvent, error) // ReadMeta reads meta from redo logs and returns the latest checkpointTs and resolvedTs ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint64, err error) } @@ -55,7 +55,7 @@ func NewRedoLogReader( ctx context.Context, storageType string, cfg *LogReaderConfig, ) (rd RedoLogReader, err error) { if !redo.IsValidConsistentStorage(storageType) { - return nil, cerror.ErrConsistentStorage.GenWithStackByArgs(storageType) + return nil, errors.ErrConsistentStorage.GenWithStackByArgs(storageType) } if redo.IsBlackholeStorage(storageType) { return newBlackHoleReader(), nil @@ -65,9 +65,6 @@ func NewRedoLogReader( // LogReaderConfig is the config for LogReader type LogReaderConfig struct { - startTs uint64 - endTs uint64 - // Dir is the folder contains the redo logs need to apply when OP environment or // the folder used to download redo logs to if using external storage, such as s3 // and gcs. @@ -85,15 +82,11 @@ type LogReaderConfig struct { // LogReader implement RedoLogReader interface type LogReader struct { - cfg *LogReaderConfig - rowReader []fileReader - ddlReader []fileReader - rowHeap logHeap - ddlHeap logHeap - meta *common.LogMeta - rowLock sync.Mutex - ddlLock sync.Mutex - metaLock sync.Mutex + cfg *LogReaderConfig + meta *common.LogMeta + rowCh chan *model.RowChangedEvent + ddlCh chan *model.DDLEvent + metaLock sync.Mutex sync.Mutex } @@ -103,14 +96,17 @@ type LogReader struct { // if s3 will download logs first, if OP environment need fetch the redo logs to local dir first func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error) { if cfg == nil { - return nil, cerror.WrapError(cerror.ErrRedoConfigInvalid, errors.New("LogReaderConfig can not be nil")) + err := errors.New("LogReaderConfig can not be nil") + return nil, errors.WrapError(errors.ErrRedoConfigInvalid, err) } if cfg.WorkerNums == 0 { cfg.WorkerNums = defaultWorkerNum } logReader := &LogReader{ - cfg: cfg, + cfg: cfg, + rowCh: make(chan *model.RowChangedEvent, defaultReaderChanSize), + ddlCh: make(chan *model.DDLEvent, defaultReaderChanSize), } if cfg.UseExternalStorage { extStorage, err := redo.InitExternalStorage(ctx, cfg.URI) @@ -120,18 +116,18 @@ func newLogReader(ctx context.Context, cfg *LogReaderConfig) (*LogReader, error) // remove logs in local dir first, if have logs left belongs to previous changefeed with the same name may have error when apply logs err = os.RemoveAll(cfg.Dir) if err != nil { - return nil, cerror.WrapError(cerror.ErrRedoFileOp, err) + return nil, errors.WrapError(errors.ErrRedoFileOp, err) } err = downLoadToLocal(ctx, cfg.Dir, extStorage, redo.RedoMetaFileType, cfg.WorkerNums) if err != nil { - return nil, cerror.WrapError(cerror.ErrRedoDownloadFailed, err) + return nil, errors.WrapError(errors.ErrRedoDownloadFailed, err) } } return logReader, nil } -// ResetReader implement ResetReader interface -func (l *LogReader) ResetReader(ctx context.Context, startTs, endTs uint64) error { +// Run implements the `RedoLogReader` interface. +func (l *LogReader) Run(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) @@ -145,207 +141,142 @@ func (l *LogReader) ResetReader(ctx context.Context, startTs, endTs uint64) erro } } - if startTs > endTs || startTs > l.meta.ResolvedTs || endTs <= l.meta.CheckpointTs { - return errors.Errorf( - "startTs, endTs (%d, %d] should match the boundary: (%d, %d]", - startTs, endTs, l.meta.CheckpointTs, l.meta.ResolvedTs) - } - return l.setUpReader(ctx, startTs, endTs) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + return l.runRowReader(egCtx) + }) + eg.Go(func() error { + return l.runDDLReader(egCtx) + }) + return eg.Wait() } -func (l *LogReader) setUpReader(ctx context.Context, startTs, endTs uint64) error { - l.Lock() - defer l.Unlock() - - var errs error - errs = multierr.Append(errs, l.setUpRowReader(ctx, startTs, endTs)) - errs = multierr.Append(errs, l.setUpDDLReader(ctx, startTs, endTs)) - - return errs -} - -func (l *LogReader) setUpRowReader(ctx context.Context, startTs, endTs uint64) error { - l.rowLock.Lock() - defer l.rowLock.Unlock() - - err := l.closeRowReader() - if err != nil { - return err - } - +func (l *LogReader) runRowReader(egCtx context.Context) error { + defer close(l.rowCh) rowCfg := &readerConfig{ - startTs: startTs, - endTs: endTs, + startTs: l.meta.CheckpointTs, + endTs: l.meta.ResolvedTs, dir: l.cfg.Dir, fileType: redo.RedoRowLogFileType, uri: l.cfg.URI, useExternalStorage: l.cfg.UseExternalStorage, workerNums: l.cfg.WorkerNums, } - l.rowReader, err = newReader(ctx, rowCfg) - if err != nil { - return err - } - - l.rowHeap = logHeap{} - l.cfg.startTs = startTs - l.cfg.endTs = endTs - return nil + return l.runReader(egCtx, rowCfg) } -func (l *LogReader) setUpDDLReader(ctx context.Context, startTs, endTs uint64) error { - l.ddlLock.Lock() - defer l.ddlLock.Unlock() - - err := l.closeDDLReader() - if err != nil { - return err - } - +func (l *LogReader) runDDLReader(egCtx context.Context) error { + defer close(l.ddlCh) ddlCfg := &readerConfig{ - startTs: startTs, - endTs: endTs, + startTs: l.meta.CheckpointTs - 1, + endTs: l.meta.ResolvedTs, dir: l.cfg.Dir, fileType: redo.RedoDDLLogFileType, uri: l.cfg.URI, useExternalStorage: l.cfg.UseExternalStorage, workerNums: l.cfg.WorkerNums, } - l.ddlReader, err = newReader(ctx, ddlCfg) - if err != nil { - return err - } - - l.ddlHeap = logHeap{} - l.cfg.startTs = startTs - l.cfg.endTs = endTs - return nil + return l.runReader(egCtx, ddlCfg) } -// ReadNextLog implement ReadNextLog interface -func (l *LogReader) ReadNextLog(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoRowChangedEvent, error) { - select { - case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) - default: +func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error { + fileReaders, err := newReader(egCtx, cfg) + if err != nil { + return errors.Trace(err) } - - l.rowLock.Lock() - defer l.rowLock.Unlock() - - // init heap - if l.rowHeap.Len() == 0 { - for i := 0; i < len(l.rowReader); i++ { - rl := &model.RedoLog{} - err := l.rowReader[i].Read(rl) - if err != nil { - if err != io.EOF { - return nil, err - } - continue - } - - ld := &logWithIdx{ - data: rl, - idx: i, - } - l.rowHeap = append(l.rowHeap, ld) + defer func() { + var errs error + for _, r := range fileReaders { + errs = multierr.Append(errs, r.Close()) } - heap.Init(&l.rowHeap) - } - - ret := []*model.RedoRowChangedEvent{} - var i uint64 - for l.rowHeap.Len() != 0 && i < maxNumberOfEvents { - item := heap.Pop(&l.rowHeap).(*logWithIdx) - if item.data.RedoRow != nil && item.data.RedoRow.Row != nil && - // by design only data (startTs,endTs] is needed, so filter out data may beyond the boundary - item.data.RedoRow.Row.CommitTs > l.cfg.startTs && - item.data.RedoRow.Row.CommitTs <= l.cfg.endTs { - ret = append(ret, item.data.RedoRow) - i++ + if errs != nil { + log.Error("close row reader failed", zap.Error(errs)) } + }() - rl := &model.RedoLog{} - err := l.rowReader[item.idx].Read(rl) + // init heap + redoLogHeap, err := newLogHeap(fileReaders) + if err != nil { + return errors.Trace(err) + } + for i := 0; i < len(fileReaders); i++ { + rl, err := fileReaders[i].Read() if err != nil { if err != io.EOF { - return nil, err + return errors.Trace(err) } continue } ld := &logWithIdx{ data: rl, - idx: item.idx, + idx: i, } - heap.Push(&l.rowHeap, ld) - } - - return ret, nil -} - -// ReadNextDDL implement ReadNextDDL interface -func (l *LogReader) ReadNextDDL(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoDDLEvent, error) { - select { - case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) - default: - } - - l.ddlLock.Lock() - defer l.ddlLock.Unlock() - - // init heap - if l.ddlHeap.Len() == 0 { - for i := 0; i < len(l.ddlReader); i++ { - rl := &model.RedoLog{} - err := l.ddlReader[i].Read(rl) - if err != nil { - if err != io.EOF { - return nil, err + redoLogHeap = append(redoLogHeap, ld) + } + heap.Init(&redoLogHeap) + + for redoLogHeap.Len() != 0 { + item := heap.Pop(&redoLogHeap).(*logWithIdx) + + switch cfg.fileType { + case redo.RedoRowLogFileType: + row := item.data.RedoRow.Row + // By design only data (startTs,endTs] is needed, + // so filter out data may beyond the boundary. + if row != nil && row.CommitTs > cfg.startTs && row.CommitTs <= cfg.endTs { + select { + case <-egCtx.Done(): + return errors.Trace(egCtx.Err()) + case l.rowCh <- row: } - continue } - - ld := &logWithIdx{ - data: rl, - idx: i, + case redo.RedoDDLLogFileType: + ddl := item.data.RedoDDL.DDL + if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs { + select { + case <-egCtx.Done(): + return errors.Trace(egCtx.Err()) + case l.ddlCh <- ddl: + } } - l.ddlHeap = append(l.ddlHeap, ld) - } - heap.Init(&l.ddlHeap) - } - - ret := []*model.RedoDDLEvent{} - var i uint64 - for l.ddlHeap.Len() != 0 && i < maxNumberOfEvents { - item := heap.Pop(&l.ddlHeap).(*logWithIdx) - if item.data.RedoDDL != nil && item.data.RedoDDL.DDL != nil && - // by design only data (startTs,endTs] is needed, so filter out data may beyond the boundary - item.data.RedoDDL.DDL.CommitTs > l.cfg.startTs && - item.data.RedoDDL.DDL.CommitTs <= l.cfg.endTs { - ret = append(ret, item.data.RedoDDL) - i++ } - rl := &model.RedoLog{} - err := l.ddlReader[item.idx].Read(rl) + // read next and push again + rl, err := fileReaders[item.idx].Read() if err != nil { if err != io.EOF { - return nil, err + return errors.Trace(err) } continue } - ld := &logWithIdx{ data: rl, idx: item.idx, } - heap.Push(&l.ddlHeap, ld) + heap.Push(&redoLogHeap, ld) } + return nil +} - return ret, nil +// ReadNextRow implement the `RedoLogReader` interface. +func (l *LogReader) ReadNextRow(ctx context.Context) (*model.RowChangedEvent, error) { + select { + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + case row := <-l.rowCh: + return row, nil + } +} + +// ReadNextDDL implement the `RedoLogReader` interface. +func (l *LogReader) ReadNextDDL(ctx context.Context) (*model.DDLEvent, error) { + select { + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + case ddl := <-l.ddlCh: + return ddl, nil + } } // ReadMeta implement ReadMeta interface @@ -365,7 +296,8 @@ func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint files, err := os.ReadDir(l.cfg.Dir) if err != nil { - return 0, 0, cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't read log file directory")) + err = errors.Annotate(err, "can't read log file directory") + return 0, 0, errors.WrapError(errors.ErrRedoFileOp, err) } metas := make([]*common.LogMeta, 0, 64) @@ -374,21 +306,21 @@ func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint path := filepath.Join(l.cfg.Dir, file.Name()) fileData, err := os.ReadFile(path) if err != nil { - return 0, 0, cerror.WrapError(cerror.ErrRedoFileOp, err) + return 0, 0, errors.WrapError(errors.ErrRedoFileOp, err) } log.Debug("unmarshal redo meta", zap.Int("size", len(fileData))) meta := &common.LogMeta{} _, err = meta.UnmarshalMsg(fileData) if err != nil { - return 0, 0, cerror.WrapError(cerror.ErrRedoFileOp, err) + return 0, 0, errors.WrapError(errors.ErrRedoFileOp, err) } metas = append(metas, meta) } } if len(metas) == 0 { - return 0, 0, cerror.ErrRedoMetaFileNotFound.GenWithStackByArgs(l.cfg.Dir) + return 0, 0, errors.ErrRedoMetaFileNotFound.GenWithStackByArgs(l.cfg.Dir) } common.ParseMeta(metas, &checkpointTs, &resolvedTs) @@ -401,40 +333,6 @@ func (l *LogReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs uint return } -func (l *LogReader) closeRowReader() error { - var errs error - for _, r := range l.rowReader { - errs = multierr.Append(errs, r.Close()) - } - return errs -} - -func (l *LogReader) closeDDLReader() error { - var errs error - for _, r := range l.ddlReader { - errs = multierr.Append(errs, r.Close()) - } - return errs -} - -// Close the backing file readers -func (l *LogReader) Close() error { - if l == nil { - return nil - } - - var errs error - - l.rowLock.Lock() - errs = multierr.Append(errs, l.closeRowReader()) - l.rowLock.Unlock() - - l.ddlLock.Lock() - errs = multierr.Append(errs, l.closeDDLReader()) - l.ddlLock.Unlock() - return errs -} - type logWithIdx struct { idx int data *model.RedoLog @@ -442,6 +340,27 @@ type logWithIdx struct { type logHeap []*logWithIdx +func newLogHeap(fileReaders []fileReader) (logHeap, error) { + h := logHeap{} + for i := 0; i < len(fileReaders); i++ { + rl, err := fileReaders[i].Read() + if err != nil { + if err != io.EOF { + return nil, err + } + continue + } + + ld := &logWithIdx{ + data: rl, + idx: i, + } + h = append(h, ld) + } + heap.Init(&h) + return h, nil +} + func (h logHeap) Len() int { return len(h) } diff --git a/cdc/redo/reader/reader_test.go b/cdc/redo/reader/reader_test.go index 263364734ae..930a9abc4ef 100644 --- a/cdc/redo/reader/reader_test.go +++ b/cdc/redo/reader/reader_test.go @@ -16,7 +16,6 @@ package reader import ( "context" "fmt" - "io" "net/url" "os" "path/filepath" @@ -25,7 +24,6 @@ import ( "github.com/golang/mock/gomock" "github.com/google/uuid" - "github.com/pingcap/errors" mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" @@ -33,12 +31,13 @@ import ( "github.com/pingcap/tiflow/cdc/redo/writer" "github.com/pingcap/tiflow/cdc/redo/writer/file" "github.com/pingcap/tiflow/pkg/redo" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/multierr" + "golang.org/x/sync/errgroup" ) func TestNewLogReader(t *testing.T) { + t.Parallel() + _, err := newLogReader(context.Background(), nil) require.NotNil(t, err) @@ -75,169 +74,116 @@ func TestNewLogReader(t *testing.T) { require.True(t, os.IsNotExist(err)) } -func TestLogReaderResetReader(t *testing.T) { - dir := t.TempDir() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func genLogFile( + ctx context.Context, t *testing.T, + dir string, logType string, maxCommitTs uint64, +) { cfg := &writer.LogWriterConfig{ MaxLogSizeInBytes: 100000, Dir: dir, } - fileName := fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", - "default", "test-cf100", - redo.RedoDDLLogFileType, 100, uuid.NewString(), redo.LogEXT) + fileName := fmt.Sprintf(redo.RedoLogFileFormatV2, "capture", "default", + "changefeed", logType, maxCommitTs, uuid.NewString(), redo.LogEXT) w, err := file.NewFileWriter(ctx, cfg, writer.WithLogFileName(func() string { return fileName })) require.Nil(t, err) log := &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 11}}, + RedoRow: &model.RedoRowChangedEvent{}, + RedoDDL: &model.RedoDDLEvent{}, + } + if logType == redo.RedoRowLogFileType { + log.RedoRow.Row = &model.RowChangedEvent{CommitTs: maxCommitTs} + } else if logType == redo.RedoDDLLogFileType { + log.RedoDDL.DDL = &model.DDLEvent{ + CommitTs: maxCommitTs, + TableInfo: &model.TableInfo{}, + } + log.Type = model.RedoLogTypeDDL } - data, err := log.MarshalMsg(nil) + rawData, err := log.MarshalMsg(nil) require.Nil(t, err) - _, err = w.Write(data) + _, err = w.Write(rawData) require.Nil(t, err) err = w.Close() require.Nil(t, err) +} - path := filepath.Join(dir, fileName) - f, err := os.Open(path) - require.Nil(t, err) +func TestReadLogs(t *testing.T) { + t.Parallel() - fileName = fmt.Sprintf(redo.RedoLogFileFormatV2, "cp", - "default", "test-cf10", - redo.RedoRowLogFileType, 10, uuid.NewString(), redo.LogEXT) - w, err = file.NewFileWriter(ctx, cfg, writer.WithLogFileName(func() string { - return fileName - })) - require.Nil(t, err) - log = &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 11}}, + dir := t.TempDir() + ctx, cancel := context.WithCancel(context.Background()) + + meta := &common.LogMeta{ + CheckpointTs: 11, + ResolvedTs: 100, } - data, err = log.MarshalMsg(nil) - require.Nil(t, err) - _, err = w.Write(data) - require.Nil(t, err) - err = w.Close() - require.Nil(t, err) - path = filepath.Join(dir, fileName) - f1, err := os.Open(path) - require.Nil(t, err) + for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} { + genLogFile(ctx, t, dir, logType, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, 12) + genLogFile(ctx, t, dir, logType, meta.ResolvedTs) + } + expectedRows := []uint64{12, meta.ResolvedTs} + expectedDDLs := []uint64{meta.CheckpointTs, meta.CheckpointTs, 12, meta.ResolvedTs} - type arg struct { - ctx context.Context - startTs, endTs uint64 - resolvedTs, checkPointTs uint64 + r := &LogReader{ + cfg: &LogReaderConfig{Dir: dir}, + meta: meta, + rowCh: make(chan *model.RowChangedEvent, defaultReaderChanSize), + ddlCh: make(chan *model.DDLEvent, defaultReaderChanSize), } - tests := []struct { - name string - args arg - readerErr error - wantErr string - wantStartTs, wantEndTs uint64 - rowFleName string - ddlFleName string - }{ - { - name: "happy", - args: arg{ - ctx: context.Background(), - startTs: 1, - endTs: 101, - checkPointTs: 0, - resolvedTs: 200, - }, - wantStartTs: 1, - wantEndTs: 101, - rowFleName: f1.Name(), - ddlFleName: f.Name(), - }, - { - name: "context cancel", - args: arg{ - ctx: context.Background(), - startTs: 1, - endTs: 101, - checkPointTs: 0, - resolvedTs: 200, - }, - wantErr: context.Canceled.Error(), - }, - { - name: "invalid ts", - args: arg{ - ctx: context.Background(), - startTs: 1, - endTs: 0, - checkPointTs: 0, - resolvedTs: 200, - }, - wantErr: ".*should match the boundary*.", - }, - { - name: "invalid ts", - args: arg{ - ctx: context.Background(), - startTs: 201, - endTs: 10, - checkPointTs: 0, - resolvedTs: 200, - }, - wantErr: ".*should match the boundary*.", - }, - { - name: "reader close err", - args: arg{ - ctx: context.Background(), - startTs: 1, - endTs: 10, - checkPointTs: 0, - resolvedTs: 200, - }, - wantErr: "err", - readerErr: errors.New("err"), - }, + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + return r.Run(egCtx) + }) + + for _, ts := range expectedRows { + row, err := r.ReadNextRow(egCtx) + require.NoError(t, err) + require.Equal(t, ts, row.CommitTs) + } + for _, ts := range expectedDDLs { + ddl, err := r.ReadNextDDL(egCtx) + require.NoError(t, err) + require.Equal(t, ts, ddl.CommitTs) } - for _, tt := range tests { - mockReader := &mockFileReader{} - mockReader.On("Close").Return(tt.readerErr) - r := &LogReader{ - cfg: &LogReaderConfig{Dir: dir}, - rowReader: []fileReader{mockReader}, - ddlReader: []fileReader{mockReader}, - meta: &common.LogMeta{ - CheckpointTs: tt.args.checkPointTs, - ResolvedTs: tt.args.resolvedTs, - }, - } + cancel() + require.ErrorIs(t, eg.Wait(), nil) +} - if tt.name == "context cancel" { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - tt.args.ctx = ctx - } else { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tt.args.ctx = ctx - } - err := r.ResetReader(tt.args.ctx, tt.args.startTs, tt.args.endTs) - if tt.wantErr != "" { - require.Regexp(t, tt.wantErr, err, tt.name) - } else { - require.Nil(t, err, tt.name) - mockReader.AssertNumberOfCalls(t, "Close", 2) - require.Equal(t, tt.rowFleName+redo.SortLogEXT, - r.rowReader[0].(*reader).fileName, tt.name) - require.Equal(t, tt.ddlFleName+redo.SortLogEXT, - r.ddlReader[0].(*reader).fileName, tt.name) - require.Equal(t, tt.wantStartTs, r.cfg.startTs, tt.name) - require.Equal(t, tt.wantEndTs, r.cfg.endTs, tt.name) +func TestLogReaderClose(t *testing.T) { + t.Parallel() - } + dir := t.TempDir() + ctx, cancel := context.WithCancel(context.Background()) + + meta := &common.LogMeta{ + CheckpointTs: 11, + ResolvedTs: 100, + } + for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} { + genLogFile(ctx, t, dir, logType, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, meta.CheckpointTs) + genLogFile(ctx, t, dir, logType, 12) + genLogFile(ctx, t, dir, logType, meta.ResolvedTs) } - time.Sleep(1001 * time.Millisecond) + + r := &LogReader{ + cfg: &LogReaderConfig{Dir: dir}, + meta: meta, + rowCh: make(chan *model.RowChangedEvent, 1), + ddlCh: make(chan *model.DDLEvent, 1), + } + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(func() error { + return r.Run(egCtx) + }) + + cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestLogReaderReadMeta(t *testing.T) { @@ -327,427 +273,3 @@ func TestLogReaderReadMeta(t *testing.T) { } } } - -func TestLogReaderReadNextLog(t *testing.T) { - type arg struct { - ctx context.Context - maxNum uint64 - } - tests := []struct { - name string - args arg - wantErr error - readerErr error - readerErr1 error - readerRet *model.RedoLog - readerRet1 *model.RedoLog - }{ - { - name: "happy", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 15, - RowID: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 6, - RowID: 2, - }, - }, - }, - }, - { - name: "context cancel", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 5, - RowID: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 6, - RowID: 2, - }, - }, - }, - wantErr: context.Canceled, - }, - { - name: "happy1", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 2, - RowID: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 6, - RowID: 2, - }, - }, - }, - }, - { - name: "sameCommitTs", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 2, - StartTs: 2, - RowID: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 2, - StartTs: 1, - RowID: 2, - }, - }, - }, - }, - { - name: "io.EOF err", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 5, - RowID: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 6, - RowID: 2, - }, - }, - }, - readerErr: io.EOF, - }, - { - name: "err", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 5, - RowID: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoRow: &model.RedoRowChangedEvent{ - Row: &model.RowChangedEvent{ - CommitTs: 6, - RowID: 2, - }, - }, - }, - readerErr: errors.New("xx"), - readerErr1: errors.New("xx"), - wantErr: errors.New("xx"), - }, - } - - for _, tt := range tests { - mockReader := &mockFileReader{} - mockReader.On("Read", mock.Anything).Return(tt.readerErr).Run(func(args mock.Arguments) { - arg := args.Get(0).(*model.RedoLog) - arg.RedoRow = tt.readerRet.RedoRow - arg.Type = model.RedoLogTypeRow - }).Times(int(tt.args.maxNum)) - mockReader.On("Read", mock.Anything).Return(io.EOF).Once() - - mockReader1 := &mockFileReader{} - mockReader1.On("Read", mock.Anything).Return(tt.readerErr1).Run(func(args mock.Arguments) { - arg := args.Get(0).(*model.RedoLog) - arg.RedoRow = tt.readerRet1.RedoRow - arg.Type = model.RedoLogTypeRow - }) - - l := &LogReader{ - rowReader: []fileReader{mockReader1, mockReader}, - rowHeap: logHeap{}, - cfg: &LogReaderConfig{ - startTs: 1, - endTs: 10, - }, - } - if tt.name == "context cancel" { - ctx1, cancel := context.WithCancel(context.Background()) - cancel() - tt.args.ctx = ctx1 - } - ret, err := l.ReadNextLog(tt.args.ctx, tt.args.maxNum) - if tt.wantErr != nil { - require.True(t, errors.ErrorEqual(tt.wantErr, err), tt.name) - require.Equal(t, 0, len(ret), tt.name) - } else { - require.Nil(t, err, tt.name) - require.EqualValues(t, tt.args.maxNum, len(ret), tt.name) - for i := 0; i < int(tt.args.maxNum); i++ { - if tt.name == "io.EOF err" { - require.Equal(t, ret[i].Row.CommitTs, - tt.readerRet1.RedoRow.Row.CommitTs, tt.name) - continue - } - if tt.name == "happy1" { - require.Equal(t, ret[i].Row.CommitTs, - tt.readerRet.RedoRow.Row.CommitTs, tt.name) - continue - } - require.Equal(t, ret[i].Row.CommitTs, tt.readerRet1.RedoRow.Row.CommitTs, tt.name) - require.Equal(t, ret[i].Row.StartTs, tt.readerRet1.RedoRow.Row.StartTs, tt.name) - } - } - } -} - -func TestLogReaderReadNexDDL(t *testing.T) { - type arg struct { - ctx context.Context - maxNum uint64 - } - tests := []struct { - name string - args arg - wantErr error - readerErr error - readerErr1 error - readerRet *model.RedoLog - readerRet1 *model.RedoLog - }{ - { - name: "happy", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 15, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 6, - }, - }, - }, - }, - { - name: "context cancel", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 5, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 6, - }, - }, - }, - wantErr: context.Canceled, - }, - { - name: "happy1", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 1, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 6, - }, - }, - }, - }, - { - name: "io.EOF err", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 5, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 6, - }, - }, - }, - readerErr: io.EOF, - }, - { - name: "err", - args: arg{ - ctx: context.Background(), - maxNum: 3, - }, - readerRet: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 5, - }, - }, - }, - readerRet1: &model.RedoLog{ - RedoDDL: &model.RedoDDLEvent{ - DDL: &model.DDLEvent{ - CommitTs: 6, - }, - }, - }, - readerErr: errors.New("xx"), - readerErr1: errors.New("xx"), - wantErr: errors.New("xx"), - }, - } - - for _, tt := range tests { - mockReader := &mockFileReader{} - mockReader.On("Read", mock.Anything).Return(tt.readerErr).Run(func(args mock.Arguments) { - arg := args.Get(0).(*model.RedoLog) - arg.RedoDDL = tt.readerRet.RedoDDL - arg.Type = model.RedoLogTypeDDL - }).Times(int(tt.args.maxNum)) - mockReader.On("Read", mock.Anything).Return(io.EOF).Once() - mockReader1 := &mockFileReader{} - mockReader1.On("Read", mock.Anything).Return(tt.readerErr1).Run(func(args mock.Arguments) { - arg := args.Get(0).(*model.RedoLog) - arg.RedoDDL = tt.readerRet1.RedoDDL - arg.Type = model.RedoLogTypeDDL - }) - - l := &LogReader{ - ddlReader: []fileReader{mockReader1, mockReader}, - ddlHeap: logHeap{}, - cfg: &LogReaderConfig{ - startTs: 1, - endTs: 10, - }, - } - if tt.name == "context cancel" { - ctx1, cancel := context.WithCancel(context.Background()) - cancel() - tt.args.ctx = ctx1 - } - ret, err := l.ReadNextDDL(tt.args.ctx, tt.args.maxNum) - if tt.wantErr != nil { - require.True(t, errors.ErrorEqual(tt.wantErr, err), tt.name) - require.Equal(t, 0, len(ret), tt.name) - } else { - require.Nil(t, err, tt.name) - require.EqualValues(t, tt.args.maxNum, len(ret), tt.name) - for i := 0; i < int(tt.args.maxNum); i++ { - if tt.name == "io.EOF err" { - require.Equal(t, ret[i].DDL.CommitTs, tt.readerRet1.RedoDDL.DDL.CommitTs, tt.name) - continue - } - if tt.name == "happy1" { - require.Equal(t, ret[i].DDL.CommitTs, tt.readerRet1.RedoDDL.DDL.CommitTs, tt.name) - continue - } - require.Equal(t, ret[i].DDL.CommitTs, tt.readerRet1.RedoDDL.DDL.CommitTs, tt.name) - } - } - } -} - -func TestLogReaderClose(t *testing.T) { - tests := []struct { - name string - wantErr error - err error - }{ - { - name: "happy", - }, - { - name: "err", - err: errors.New("xx"), - wantErr: multierr.Append(errors.New("xx"), errors.New("xx")), - }, - } - - for _, tt := range tests { - mockReader := &mockFileReader{} - mockReader.On("Close").Return(tt.err) - l := &LogReader{ - rowReader: []fileReader{mockReader}, - ddlReader: []fileReader{mockReader}, - } - err := l.Close() - mockReader.AssertNumberOfCalls(t, "Close", 2) - if tt.wantErr != nil { - require.True(t, errors.ErrorEqual(tt.wantErr, err), tt.name) - } else { - require.Nil(t, err, tt.name) - } - } -} diff --git a/cdc/sinkv2/ddlsink/factory/factory.go b/cdc/sinkv2/ddlsink/factory/factory.go index ef04a72ae61..701301aeb89 100644 --- a/cdc/sinkv2/ddlsink/factory/factory.go +++ b/cdc/sinkv2/ddlsink/factory/factory.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" - pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" ) // New creates a new ddlsink.DDLEventSink by schema. @@ -48,7 +47,7 @@ func New( case sink.BlackHoleScheme: return blackhole.New(), nil case sink.MySQLSSLScheme, sink.MySQLScheme, sink.TiDBScheme, sink.TiDBSSLScheme: - return mysql.NewMySQLDDLSink(ctx, sinkURI, cfg, pmysql.CreateMySQLDBConn) + return mysql.NewMySQLDDLSink(ctx, sinkURI, cfg) case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: return cloudstorage.NewCloudStorageDDLSink(ctx, sinkURI) default: diff --git a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go index b239f12817e..f6c83d7c4a7 100644 --- a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go @@ -44,7 +44,11 @@ const ( networkDriftDuration = 5 * time.Second ) -// Assert DDLEventSink implementation +// GetDBConnImpl is the implementation of pmysql.Factory. +// Exported for testing. +var GetDBConnImpl pmysql.Factory = pmysql.CreateMySQLDBConn + +// Assert Sink implementation var _ ddlsink.DDLEventSink = (*mysqlDDLSink)(nil) type mysqlDDLSink struct { @@ -63,7 +67,6 @@ func NewMySQLDDLSink( ctx context.Context, sinkURI *url.URL, replicaConfig *config.ReplicaConfig, - dbConnFactory pmysql.Factory, ) (*mysqlDDLSink, error) { changefeedID := contextutil.ChangefeedIDFromCtx(ctx) cfg := pmysql.NewConfig() @@ -72,12 +75,12 @@ func NewMySQLDDLSink( return nil, err } - dsnStr, err := pmysql.GenerateDSN(ctx, sinkURI, cfg, dbConnFactory) + dsnStr, err := pmysql.GenerateDSN(ctx, sinkURI, cfg, GetDBConnImpl) if err != nil { return nil, err } - db, err := dbConnFactory(ctx, dsnStr) + db, err := GetDBConnImpl(ctx, dsnStr) if err != nil { return nil, err } diff --git a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go index d1f23bdcd03..a293c48f2e0 100644 --- a/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go @@ -34,7 +34,7 @@ func TestWriteDDLEvent(t *testing.T) { t.Parallel() dbIndex := 0 - mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { dbIndex++ }() @@ -69,8 +69,7 @@ func TestWriteDDLEvent(t *testing.T) { sinkURI, err := url.Parse("mysql://127.0.0.1:4000") require.Nil(t, err) rc := config.GetDefaultReplicaConfig() - sink, err := NewMySQLDDLSink(ctx, - sinkURI, rc, mockGetDBConn) + sink, err := NewMySQLDDLSink(ctx, sinkURI, rc) require.Nil(t, err) diff --git a/pkg/applier/main_test.go b/pkg/applier/main_test.go new file mode 100644 index 00000000000..9515dee7951 --- /dev/null +++ b/pkg/applier/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package applier + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index 3a5363d06d2..5a961e762ab 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -18,18 +18,19 @@ import ( "net/url" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/reader" - "github.com/pingcap/tiflow/cdc/sink/mysql" - "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory" + "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink" + ddlfactory "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/factory" + dmlfactory "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -38,11 +39,18 @@ import ( const ( applierChangefeed = "redo-applier" - emitBatch = mysql.DefaultMaxTxnRow - readBatch = mysql.DefaultWorkerCount * emitBatch + warnDuration = 3 * time.Minute + flushWaitDuration = 200 * time.Millisecond ) -var errApplyFinished = errors.New("apply finished, can exit safely") +var ( + // In the boundary case, non-idempotent DDLs will not be executed. + // TODO(CharlesCheung96): fix this + unsupportedDDL = map[timodel.ActionType]struct{}{ + timodel.ActionExchangeTablePartition: {}, + } + errApplyFinished = errors.New("apply finished, can exit safely") +) // RedoApplierConfig is the configuration used by a redo log applier type RedoApplierConfig struct { @@ -54,22 +62,27 @@ type RedoApplierConfig struct { // RedoApplier implements a redo log applier type RedoApplier struct { cfg *RedoApplierConfig + rd reader.RedoLogReader + + ddlSink ddlsink.DDLEventSink + appliedDDLCount uint64 - rd reader.RedoLogReader // sinkFactory is used to create table sinks. - sinkFactory *factory.SinkFactory + sinkFactory *dmlfactory.SinkFactory // tableSinks is a map from tableID to table sink. // We create it when we need it, and close it after we finish applying the redo logs. - tableSinks map[model.TableID]tablesink.TableSink - errCh chan error + tableSinks map[model.TableID]tablesink.TableSink + tableResolvedTsMap map[model.TableID]model.ResolvedTs + appliedLogCount uint64 + + errCh chan error } // NewRedoApplier creates a new RedoApplier instance func NewRedoApplier(cfg *RedoApplierConfig) *RedoApplier { return &RedoApplier{ - cfg: cfg, - tableSinks: make(map[model.TableID]tablesink.TableSink), - errCh: make(chan error, 1024), + cfg: cfg, + errCh: make(chan error, 1024), } } @@ -78,7 +91,7 @@ func NewRedoApplier(cfg *RedoApplierConfig) *RedoApplier { func (rac *RedoApplierConfig) toLogReaderConfig() (string, *reader.LogReaderConfig, error) { uri, err := url.Parse(rac.Storage) if err != nil { - return "", nil, cerror.WrapError(cerror.ErrConsistentStorage, err) + return "", nil, errors.WrapError(errors.ErrConsistentStorage, err) } cfg := &reader.LogReaderConfig{ Dir: uri.Path, @@ -103,102 +116,202 @@ func (ra *RedoApplier) catchError(ctx context.Context) error { } } +func (ra *RedoApplier) initSink(ctx context.Context) (err error) { + replicaConfig := config.GetDefaultReplicaConfig() + ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh) + if err != nil { + return err + } + ra.ddlSink, err = ddlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig) + if err != nil { + return err + } + + ra.tableSinks = make(map[model.TableID]tablesink.TableSink) + ra.tableResolvedTsMap = make(map[model.TableID]model.ResolvedTs) + return nil +} + func (ra *RedoApplier) consumeLogs(ctx context.Context) error { checkpointTs, resolvedTs, err := ra.rd.ReadMeta(ctx) if err != nil { return err } - if checkpointTs == resolvedTs { - log.Info("apply redo log suncceed: checkpointTs == resolvedTs", - zap.Uint64("checkpointTs", checkpointTs), - zap.Uint64("resolvedTs", resolvedTs)) - return errApplyFinished + log.Info("apply redo log starts", + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("resolvedTs", resolvedTs)) + if err := ra.initSink(ctx); err != nil { + return err } + defer ra.sinkFactory.Close() - err = ra.rd.ResetReader(ctx, checkpointTs, resolvedTs) + shouldApplyDDL := func(row *model.RowChangedEvent, ddl *model.DDLEvent) bool { + if ddl == nil { + return false + } else if row == nil { + // no more rows to apply + return true + } + // If all rows before the DDL (which means row.CommitTs <= ddl.CommitTs) + // are applied, we should apply this DDL. + return row.CommitTs > ddl.CommitTs + } + + row, err := ra.rd.ReadNextRow(ctx) + if err != nil { + return err + } + ddl, err := ra.rd.ReadNextDDL(ctx) if err != nil { return err } - log.Info("apply redo log starts", zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", resolvedTs)) - - tableResolvedTsMap := make(map[model.TableID]model.ResolvedTs) - appliedLogCount := 0 for { - redoLogs, err := ra.rd.ReadNextLog(ctx, readBatch) - if err != nil { - return err - } - if len(redoLogs) == 0 { + if row == nil && ddl == nil { break } - appliedLogCount += len(redoLogs) - - for _, redoLog := range redoLogs { - row := common.LogToRow(redoLog) - tableID := row.Table.TableID - if _, ok := ra.tableSinks[tableID]; !ok { - tableSink := ra.sinkFactory.CreateTableSink( - model.DefaultChangeFeedID(applierChangefeed), - tableID, - prometheus.NewCounter(prometheus.CounterOpts{}), - ) - ra.tableSinks[tableID] = tableSink + if shouldApplyDDL(row, ddl) { + if err := ra.applyDDL(ctx, ddl, checkpointTs, resolvedTs); err != nil { + return err } - if _, ok := tableResolvedTsMap[tableID]; !ok { - tableResolvedTsMap[tableID] = model.NewResolvedTs(checkpointTs) + if ddl, err = ra.rd.ReadNextDDL(ctx); err != nil { + return err + } + } else { + if err := ra.applyRow(row, checkpointTs); err != nil { + return err } - ra.tableSinks[tableID].AppendRowChangedEvents(row) - if redoLog.Row.CommitTs > tableResolvedTsMap[tableID].Ts { - // Use batch resolvedTs to flush data as quickly as possible. - tableResolvedTsMap[tableID] = model.ResolvedTs{ - Mode: model.BatchResolvedMode, - Ts: row.CommitTs, - BatchID: 1, - } + if row, err = ra.rd.ReadNextRow(ctx); err != nil { + return err } } + } + // wait all tables to flush data + for tableID := range ra.tableResolvedTsMap { + if err := ra.waitTableFlush(ctx, tableID, resolvedTs); err != nil { + return err + } + ra.tableSinks[tableID].Close() + } - for tableID, tableResolvedTs := range tableResolvedTsMap { + log.Info("apply redo log finishes", + zap.Uint64("appliedLogCount", ra.appliedLogCount), + zap.Uint64("appliedDDLCount", ra.appliedDDLCount), + zap.Uint64("currentCheckpoint", resolvedTs)) + return errApplyFinished +} + +func (ra *RedoApplier) applyDDL( + ctx context.Context, ddl *model.DDLEvent, checkpointTs, resolvedTs uint64, +) error { + shouldSkip := func() bool { + if ddl.CommitTs == checkpointTs { + if _, ok := unsupportedDDL[ddl.Type]; ok { + log.Error("ignore unsupported DDL", zap.Any("ddl", ddl)) + return true + } + } + if ddl.TableInfo == nil { + // Note this could omly happen when using old version of cdc, and the commit ts + // of the DDL should be equal to checkpoint ts or resolved ts. + log.Warn("ignore DDL without table info", zap.Any("ddl", ddl)) + return true + } + return false + } + if ddl.CommitTs != checkpointTs && ddl.CommitTs != resolvedTs { + // TODO: move this panic to shouldSkip after redo log supports cross DDL events. + log.Panic("ddl commit ts is not equal to checkpoint ts or resolved ts") + } + if shouldSkip() { + return nil + } + log.Warn("apply DDL", zap.Any("ddl", ddl)) + // Wait all tables to flush data before applying DDL. + // TODO: only block tables that are affected by this DDL. + for tableID := range ra.tableSinks { + if err := ra.waitTableFlush(ctx, tableID, ddl.CommitTs); err != nil { + return err + } + } + if err := ra.ddlSink.WriteDDLEvent(ctx, ddl); err != nil { + return err + } + ra.appliedDDLCount++ + return nil +} + +func (ra *RedoApplier) applyRow( + row *model.RowChangedEvent, checkpointTs model.Ts, +) error { + tableID := row.Table.TableID + if _, ok := ra.tableSinks[tableID]; !ok { + tableSink := ra.sinkFactory.CreateTableSink( + model.DefaultChangeFeedID(applierChangefeed), + tableID, + prometheus.NewCounter(prometheus.CounterOpts{}), + ) + ra.tableSinks[tableID] = tableSink + } + if _, ok := ra.tableResolvedTsMap[tableID]; !ok { + ra.tableResolvedTsMap[tableID] = model.NewResolvedTs(checkpointTs) + } + ra.tableSinks[tableID].AppendRowChangedEvents(row) + if row.CommitTs > ra.tableResolvedTsMap[tableID].Ts { + // Use batch resolvedTs to flush data as quickly as possible. + ra.tableResolvedTsMap[tableID] = model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: row.CommitTs, + BatchID: 1, + } + } else if row.CommitTs < ra.tableResolvedTsMap[tableID].Ts { + log.Panic("commit ts of redo log regressed", + zap.Int64("tableID", tableID), + zap.Uint64("commitTs", row.CommitTs), + zap.Any("resolvedTs", ra.tableResolvedTsMap[tableID])) + } + + ra.appliedLogCount++ + if ra.appliedLogCount%mysql.DefaultMaxTxnRow == 0 { + for tableID, tableResolvedTs := range ra.tableResolvedTsMap { if err := ra.tableSinks[tableID].UpdateResolvedTs(tableResolvedTs); err != nil { return err } if tableResolvedTs.IsBatchMode() { - tableResolvedTsMap[tableID] = tableResolvedTs.AdvanceBatch() + ra.tableResolvedTsMap[tableID] = tableResolvedTs.AdvanceBatch() } } } + return nil +} - const warnDuration = 3 * time.Minute - const flushWaitDuration = 200 * time.Millisecond +func (ra *RedoApplier) waitTableFlush( + ctx context.Context, tableID model.TableID, rts model.Ts, +) error { ticker := time.NewTicker(warnDuration) defer ticker.Stop() - for tableID := range tableResolvedTsMap { - resolvedTs := model.NewResolvedTs(resolvedTs) - if err := ra.tableSinks[tableID].UpdateResolvedTs(resolvedTs); err != nil { - return err - } - // Make sure all events are flushed to downstream. - for !ra.tableSinks[tableID].GetCheckpointTs().EqualOrGreater(resolvedTs) { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case <-ticker.C: - log.Warn( - "Table sink is not catching up with resolved ts for a long time", - zap.Int64("tableID", tableID), - zap.Any("resolvedTs", resolvedTs), - zap.Any("checkpointTs", ra.tableSinks[tableID].GetCheckpointTs()), - ) - default: - time.Sleep(flushWaitDuration) - } + resolvedTs := model.NewResolvedTs(rts) + ra.tableResolvedTsMap[tableID] = resolvedTs + if err := ra.tableSinks[tableID].UpdateResolvedTs(resolvedTs); err != nil { + return err + } + // Make sure all events are flushed to downstream. + for !ra.tableSinks[tableID].GetCheckpointTs().EqualOrGreater(resolvedTs) { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-ticker.C: + log.Warn( + "Table sink is not catching up with resolved ts for a long time", + zap.Int64("tableID", tableID), + zap.Any("resolvedTs", resolvedTs), + zap.Any("checkpointTs", ra.tableSinks[tableID].GetCheckpointTs()), + ) + default: + time.Sleep(flushWaitDuration) } - ra.tableSinks[tableID].Close() } - - log.Info("apply redo log finishes", zap.Int("appliedLogCount", appliedLogCount)) - return errApplyFinished + return nil } var createRedoReader = createRedoReaderImpl @@ -221,40 +334,24 @@ func (ra *RedoApplier) ReadMeta(ctx context.Context) (checkpointTs uint64, resol } // Apply applies redo log to given target -func (ra *RedoApplier) Apply(ctx context.Context) error { - rd, err := createRedoReader(ctx, ra.cfg) - if err != nil { +func (ra *RedoApplier) Apply(egCtx context.Context) (err error) { + eg, egCtx := errgroup.WithContext(egCtx) + egCtx = contextutil.PutRoleInCtx(egCtx, util.RoleRedoLogApplier) + if ra.rd, err = createRedoReader(egCtx, ra.cfg); err != nil { return err } - ra.rd = rd - defer func() { - if err = ra.rd.Close(); err != nil { - log.Warn("Close redo reader failed", zap.Error(err)) - } - }() - // MySQL sink will use the following replication config - // - EnableOldValue: default true - // - ForceReplicate: default false - // - filter: default []string{"*.*"} - replicaConfig := config.GetDefaultReplicaConfig() - ctx = contextutil.PutRoleInCtx(ctx, util.RoleRedoLogApplier) - sinkFactory, err := factory.New(ctx, - ra.cfg.SinkURI, replicaConfig, ra.errCh) - if err != nil { - return err - } - ra.sinkFactory = sinkFactory - defer sinkFactory.Close() - wg, ctx := errgroup.WithContext(ctx) - wg.Go(func() error { - return ra.consumeLogs(ctx) + eg.Go(func() error { + return ra.rd.Run(egCtx) + }) + eg.Go(func() error { + return ra.consumeLogs(egCtx) }) - wg.Go(func() error { - return ra.catchError(ctx) + eg.Go(func() error { + return ra.catchError(egCtx) }) - err = wg.Wait() + err = eg.Wait() if errors.Cause(err) != errApplyFinished { return err } diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 87e2fc59a30..80fc46aa2ce 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -22,27 +22,31 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/go-sql-driver/mysql" "github.com/phayes/freeport" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo/common" "github.com/pingcap/tiflow/cdc/redo/reader" + mysqlDDL "github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/mysql" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/txn" + pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/stretchr/testify/require" ) +var _ reader.RedoLogReader = &MockReader{} + // MockReader is a mock redo log reader that implements LogReader interface type MockReader struct { checkpointTs uint64 resolvedTs uint64 - redoLogCh chan *model.RedoRowChangedEvent - ddlEventCh chan *model.RedoDDLEvent + redoLogCh chan *model.RowChangedEvent + ddlEventCh chan *model.DDLEvent } // NewMockReader creates a new MockReader func NewMockReader( checkpointTs uint64, resolvedTs uint64, - redoLogCh chan *model.RedoRowChangedEvent, - ddlEventCh chan *model.RedoDDLEvent, + redoLogCh chan *model.RowChangedEvent, + ddlEventCh chan *model.DDLEvent, ) *MockReader { return &MockReader{ checkpointTs: checkpointTs, @@ -53,45 +57,27 @@ func NewMockReader( } // ResetReader implements LogReader.ReadLog -func (br *MockReader) ResetReader(ctx context.Context, startTs, endTs uint64) error { +func (br *MockReader) Run(ctx context.Context) error { return nil } -// ReadNextLog implements LogReader.ReadNextLog -func (br *MockReader) ReadNextLog(ctx context.Context, maxNumberOfMessages uint64) ([]*model.RedoRowChangedEvent, error) { - cached := make([]*model.RedoRowChangedEvent, 0) - for { - select { - case <-ctx.Done(): - return cached, nil - case redoLog, ok := <-br.redoLogCh: - if !ok { - return cached, nil - } - cached = append(cached, redoLog) - if len(cached) >= int(maxNumberOfMessages) { - return cached, nil - } - } +// ReadNextRow implements LogReader.ReadNextRow +func (br *MockReader) ReadNextRow(ctx context.Context) (*model.RowChangedEvent, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case row := <-br.redoLogCh: + return row, nil } } // ReadNextDDL implements LogReader.ReadNextDDL -func (br *MockReader) ReadNextDDL(ctx context.Context, maxNumberOfDDLs uint64) ([]*model.RedoDDLEvent, error) { - cached := make([]*model.RedoDDLEvent, 0) - for { - select { - case <-ctx.Done(): - return cached, nil - case ddl, ok := <-br.ddlEventCh: - if !ok { - return cached, nil - } - cached = append(cached, ddl) - if len(cached) >= int(maxNumberOfDDLs) { - return cached, nil - } - } +func (br *MockReader) ReadNextDDL(ctx context.Context) (*model.DDLEvent, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case ddl := <-br.ddlEventCh: + return ddl, nil } } @@ -100,99 +86,43 @@ func (br *MockReader) ReadMeta(ctx context.Context) (checkpointTs, resolvedTs ui return br.checkpointTs, br.resolvedTs, nil } -// Close implements LogReader.Close. -func (br *MockReader) Close() error { - return nil -} - -func TestApplyDMLs(t *testing.T) { +func TestApply(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() checkpointTs := uint64(1000) resolvedTs := uint64(2000) - redoLogCh := make(chan *model.RedoRowChangedEvent, 1024) - ddlEventCh := make(chan *model.RedoDDLEvent, 1024) + redoLogCh := make(chan *model.RowChangedEvent, 1024) + ddlEventCh := make(chan *model.DDLEvent, 1024) createMockReader := func(ctx context.Context, cfg *RedoApplierConfig) (reader.RedoLogReader, error) { return NewMockReader(checkpointTs, resolvedTs, redoLogCh, ddlEventCh), nil } dbIndex := 0 + // DML sink and DDL sink share the same db + db := getMockDB(t) mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { dbIndex++ }() - if dbIndex == 0 { - // mock for test db, which is used querying TiDB session variable - db, mock, err := sqlmock.New() - if err != nil { - return nil, err - } - mock.ExpectQuery("SELECT @@SESSION.sql_mode;"). - WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}). - AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE")) - columns := []string{"Variable_name", "Value"} - mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), - ) - mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), - ) - mock.ExpectQuery("show session variables like 'transaction_isolation';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("transaction_isolation", "REPEATED-READ"), - ) - mock.ExpectQuery("show session variables like 'tidb_placement_mode';"). - WillReturnRows( - sqlmock.NewRows(columns). - AddRow("tidb_placement_mode", "IGNORE"), - ) - mock.ExpectQuery("show session variables like 'tidb_enable_external_ts_read';"). - WillReturnRows( - sqlmock.NewRows(columns). - AddRow("tidb_enable_external_ts_read", "OFF"), - ) - mock.ExpectQuery("select character_set_name from information_schema.character_sets " + - "where character_set_name = 'gbk';").WillReturnRows( - sqlmock.NewRows([]string{"character_set_name"}).AddRow("gbk"), - ) - mock.ExpectClose() - return db, nil + if dbIndex%2 == 0 { + testDB, err := pmysql.MockTestDB(true) + require.Nil(t, err) + return testDB, nil } - - // normal db - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - require.Nil(t, err) - // Before we write data to downstream, we need to check whether the downstream is TiDB. - // So we mock a select tidb_version() query. - mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ - Number: 1305, - Message: "FUNCTION test.tidb_version does not exist", - }) - mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). - WithArgs(1, "2"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit() - - mock.ExpectBegin() - mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). - WithArgs(1, "2"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). - WithArgs(2, "3"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit() - mock.ExpectClose() return db, nil } - getDBConnBak := txn.GetDBConnImpl + getDMLDBConnBak := txn.GetDBConnImpl txn.GetDBConnImpl = mockGetDBConn + getDDLDBConnBak := mysqlDDL.GetDBConnImpl + mysqlDDL.GetDBConnImpl = mockGetDBConn createRedoReaderBak := createRedoReader createRedoReader = createMockReader defer func() { createRedoReader = createRedoReaderBak - txn.GetDBConnImpl = getDBConnBak + txn.GetDBConnImpl = getDMLDBConnBak + mysqlDDL.GetDBConnImpl = getDDLDBConnBak }() dmls := []*model.RowChangedEvent{ @@ -214,7 +144,7 @@ func TestApplyDMLs(t *testing.T) { }, { StartTs: 1200, - CommitTs: 1300, + CommitTs: resolvedTs, Table: &model.TableName{Schema: "test", Table: "t1"}, PreColumns: []*model.Column{ { @@ -241,13 +171,39 @@ func TestApplyDMLs(t *testing.T) { }, } for _, dml := range dmls { - redoLogCh <- common.RowToRedo(dml) + redoLogCh <- dml + } + ddls := []*model.DDLEvent{ + { + CommitTs: checkpointTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "checkpoint", + }, + }, + Query: "create table checkpoint(id int)", + Type: timodel.ActionCreateTable, + }, + { + CommitTs: resolvedTs, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", Table: "resolved", + }, + }, + Query: "create table resolved(id int)", + Type: timodel.ActionCreateTable, + }, + } + for _, ddl := range ddls { + ddlEventCh <- ddl } close(redoLogCh) close(ddlEventCh) cfg := &RedoApplierConfig{ - SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore&safe-mode=true", + SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1" + + "&tidb_placement_mode=ignore&safe-mode=true&cache-prep-stmts=false", } ap := NewRedoApplier(cfg) err := ap.Apply(ctx) @@ -268,3 +224,46 @@ func TestApplyMeetSinkError(t *testing.T) { err = ap.Apply(ctx) require.Regexp(t, "CDC:ErrMySQLConnectionError", err) } + +func getMockDB(t *testing.T) *sql.DB { + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + + // Before we write data to downstream, we need to check whether the downstream is TiDB. + // So we mock a select tidb_version() query. + mock.ExpectQuery("select tidb_version()").WillReturnError(&mysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(1, "2"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + // First, apply row which commitTs equal to resolvedTs + mock.ExpectBegin() + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). + WithArgs(1, "2"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). + WithArgs(2, "3"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + // Then, apply ddl which commitTs equal to resolvedTs + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("create table resolved(id int)").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + mock.ExpectClose() + return db +} diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 204765e0741..7bdc9b3c394 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -37,10 +37,10 @@ const ( txnModeOptimistic = "optimistic" txnModePessimistic = "pessimistic" - // defaultWorkerCount is the default number of workers. - defaultWorkerCount = 16 - // defaultMaxTxnRow is the default max number of rows in a transaction. - defaultMaxTxnRow = 256 + // DefaultWorkerCount is the default number of workers. + DefaultWorkerCount = 16 + // DefaultMaxTxnRow is the default max number of rows in a transaction. + DefaultMaxTxnRow = 256 // defaultMaxMultiUpdateRowCount is the default max number of rows in a // single multi update SQL. defaultMaxMultiUpdateRowCount = 40 @@ -102,8 +102,8 @@ type Config struct { // NewConfig returns the default mysql backend config. func NewConfig() *Config { return &Config{ - WorkerCount: defaultWorkerCount, - MaxTxnRow: defaultMaxTxnRow, + WorkerCount: DefaultWorkerCount, + MaxTxnRow: DefaultMaxTxnRow, MaxMultiUpdateRowCount: defaultMaxMultiUpdateRowCount, MaxMultiUpdateRowSize: defaultMaxMultiUpdateRowSize, tidbTxnMode: defaultTiDBTxnMode,