Skip to content

Commit

Permalink
redo(ticdc): support for applying ddl event in applier (#8362)
Browse files Browse the repository at this point in the history
close #8361
  • Loading branch information
CharlesCheung96 committed Apr 4, 2023
1 parent 0180aed commit 504a637
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 1,051 deletions.
5 changes: 3 additions & 2 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ type RedoRowChangedEvent struct {

// RedoDDLEvent represents DDL event used in redo log persistent
type RedoDDLEvent struct {
DDL *DDLEvent `msg:"ddl"`
Type byte `msg:"type"`
DDL *DDLEvent `msg:"ddl"`
Type byte `msg:"type"`
TableName TableName `msg:"table-name"`
}

// ToRedoLog converts row changed event to redo log
Expand Down
39 changes: 34 additions & 5 deletions cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions cdc/redo/reader/blackhole_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ func newBlackHoleReader() *BlackHoleReader {
return &BlackHoleReader{}
}

// ResetReader implements LogReader.ReadLog
func (br *BlackHoleReader) ResetReader(ctx context.Context, startTs, endTs uint64) error {
// Run implements LogReader.Run
func (br *BlackHoleReader) Run(ctx context.Context) error {
return nil
}

// ReadNextLog implements LogReader.ReadNextLog
func (br *BlackHoleReader) ReadNextLog(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoRowChangedEvent, error) {
// ReadNextRow implements LogReader.ReadNextRow
func (br *BlackHoleReader) ReadNextRow(ctx context.Context) (*model.RowChangedEvent, error) {
return nil, nil
}

// ReadNextDDL implements LogReader.ReadNextDDL
func (br *BlackHoleReader) ReadNextDDL(ctx context.Context, maxNumberOfEvents uint64) ([]*model.RedoDDLEvent, error) {
func (br *BlackHoleReader) ReadNextDDL(ctx context.Context) (*model.DDLEvent, error) {
return nil, nil
}

Expand Down
27 changes: 16 additions & 11 deletions cdc/redo/reader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -53,7 +54,7 @@ const (
type fileReader interface {
io.Closer
// Read return the log from log file
Read(log *model.RedoLog) error
Read() (*model.RedoLog, error)
}

type readerConfig struct {
Expand Down Expand Up @@ -84,6 +85,7 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) {
if cfg.workerNums == 0 {
cfg.workerNums = defaultWorkerNum
}
start := time.Now()

if cfg.useExternalStorage {
extStorage, err := redo.InitExternalStorage(ctx, cfg.uri)
Expand Down Expand Up @@ -113,6 +115,9 @@ func newReader(ctx context.Context, cfg *readerConfig) ([]fileReader, error) {
})
}

log.Info("succeed to download and sort redo logs",
zap.String("type", cfg.fileType),
zap.Duration("duration", time.Since(start)))
return readers, nil
}

Expand Down Expand Up @@ -249,8 +254,7 @@ func readFile(file *os.File) (logHeap, error) {

h := logHeap{}
for {
rl := &model.RedoLog{}
err := r.Read(rl)
rl, err := r.Read()
if err != nil {
if err != io.EOF {
return nil, err
Expand Down Expand Up @@ -383,16 +387,16 @@ func shouldOpen(startTs uint64, name, fixedType string) (bool, error) {

// Read implement Read interface.
// TODO: more general reader pair with writer in writer pkg
func (r *reader) Read(redoLog *model.RedoLog) error {
func (r *reader) Read() (*model.RedoLog, error) {
r.mu.Lock()
defer r.mu.Unlock()

lenField, err := readInt64(r.br)
if err != nil {
if err == io.EOF {
return err
return nil, err
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return nil, cerror.WrapError(cerror.ErrRedoFileOp, err)
}

recBytes, padBytes := decodeFrameSize(lenField)
Expand All @@ -403,23 +407,24 @@ func (r *reader) Read(redoLog *model.RedoLog) error {
log.Warn("read redo log have unexpected io error",
zap.String("fileName", r.fileName),
zap.Error(err))
return io.EOF
return nil, io.EOF
}
return cerror.WrapError(cerror.ErrRedoFileOp, err)
return nil, cerror.WrapError(cerror.ErrRedoFileOp, err)
}

redoLog := new(model.RedoLog)
_, err = redoLog.UnmarshalMsg(data[:recBytes])
if err != nil {
if r.isTornEntry(data) {
// just return io.EOF, since if torn write it is the last redoLog entry
return io.EOF
return nil, io.EOF
}
return cerror.WrapError(cerror.ErrUnmarshalFailed, err)
return nil, cerror.WrapError(cerror.ErrUnmarshalFailed, err)
}

// point last valid offset to the end of redoLog
r.lastValidOff += frameSizeBytes + recBytes + padBytes
return nil
return redoLog, nil
}

func readInt64(r io.Reader) (int64, error) {
Expand Down
6 changes: 2 additions & 4 deletions cdc/redo/reader/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ func TestReaderRead(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 1, len(r))
defer r[0].Close() //nolint:errcheck
log = &model.RedoLog{}
err = r[0].Read(log)
log, err = r[0].Read()
require.Nil(t, err)
require.EqualValues(t, 1123, log.RedoRow.Row.CommitTs)
time.Sleep(1001 * time.Millisecond)
Expand Down Expand Up @@ -236,8 +235,7 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
closer: r,
}
for {
rl := &model.RedoLog{}
err := r.Read(rl)
rl, err := r.Read()
if err == io.EOF {
break
}
Expand Down
Loading

0 comments on commit 504a637

Please sign in to comment.