Skip to content

Commit

Permalink
sink(cdc): fix "dead dmlSink" error in sink workers (#9686)
Browse files Browse the repository at this point in the history
close #9685
  • Loading branch information
hicqu authored Sep 8, 2023
1 parent 4452688 commit 85dcc86
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 34 deletions.
15 changes: 6 additions & 9 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,23 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask)
}

func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) {
advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager)
// The task is finished and some required memory isn't used.
defer advancer.cleanup()

lowerBound, upperBound := validateAndAdjustBound(
w.changefeedID,
&task.span,
task.lowerBound,
task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()),
)
advancer.lastPos = lowerBound.Prev()

var cache *eventAppender
if w.eventCache != nil {
cache = w.eventCache.maybeCreateAppender(task.span, lowerBound)
}

advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager)
// The task is finished and some required memory isn't used.
defer advancer.cleanup()

iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.memQuota)
allEventCount := 0
cachedSize := uint64(0)
Expand Down Expand Up @@ -124,11 +125,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
cache.pushBatch(nil, 0, upperBound)
}

return advancer.finish(
ctx,
cachedSize,
upperBound,
)
return advancer.finish(ctx, cachedSize, upperBound)
}

allEventCount += 1
Expand Down
44 changes: 44 additions & 0 deletions cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,47 @@ func (suite *redoLogWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceIfNoWorkloa
cancel()
wg.Wait()
}

// When starts to handle a task, advancer.lastPos should be set to a correct position.
// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an
// invalid `advancer.lastPos`.
func (suite *redoLogWorkerSuite) TestHandleTaskWithoutMemory() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
w, e, _ := suite.createWorker(ctx, 0)
defer w.memQuota.Close()
suite.addEventsToSortEngine(events, e)

taskChan := make(chan *redoTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos)
close(chShouldBeClosed)
}
taskChan <- &redoTask{
span: suite.testSpan,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}
69 changes: 44 additions & 25 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,18 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
&task.span,
task.lowerBound,
task.getUpperBound(task.tableSink.getUpperBoundTs()))
if w.eventCache != nil {
drained, err := w.fetchFromCache(task, &lowerBound, &upperBound)
if err != nil {
return errors.Trace(err)
}
// We have drained all events from the cache, we can return directly.
// No need to get events from the source manager again.
if drained {
task.callback(lowerBound.Prev())
return nil
}
}
advancer.lastPos = lowerBound.Prev()

allEventSize := uint64(0)
allEventCount := 0
// lowerBound and upperBound are both closed intervals.
iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota)

callbackIsPerformed := false
performCallback := func(pos engine.Position) {
if !callbackIsPerformed {
task.callback(pos)
callbackIsPerformed = true
}
}

defer func() {
// Collect metrics.
Expand All @@ -153,13 +148,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
task.tableSink.updateRangeEventCounts(eventCount)
}

if err := iter.Close(); err != nil {
log.Error("Sink worker fails to close iterator",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Error(err))
}
log.Debug("Sink task finished",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
Expand All @@ -174,7 +162,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e

// Otherwise we can't ensure all events before `lastPos` are emitted.
if finalErr == nil {
task.callback(advancer.lastPos)
performCallback(advancer.lastPos)
} else {
switch errors.Cause(finalErr).(type) {
// If it's a warning, close the table sink and wait all pending
Expand All @@ -187,22 +175,53 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
w.sinkMemQuota.ClearTable(task.tableSink.span)

// Restart the table sink based on the checkpoint position.
if finalErr = task.tableSink.restart(ctx); finalErr == nil {
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
task.callback(lastWrittenPos)
performCallback(lastWrittenPos)
log.Info("table sink has been restarted",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Any("lastWrittenPos", lastWrittenPos))
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", finalErr.Error()))
finalErr = err
}
default:
}
}
}()

if w.eventCache != nil {
drained, err := w.fetchFromCache(task, &lowerBound, &upperBound)
failpoint.Inject("TableSinkWorkerFetchFromCache", func() {
err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected"))
})
if err != nil {
return errors.Trace(err)
}
if drained {
// If drained is true it means we have drained all events from the cache,
// we can return directly instead of get events from the source manager again.
performCallback(lowerBound.Prev())
return nil
}
advancer.lastPos = lowerBound.Prev()
}

// lowerBound and upperBound are both closed intervals.
iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota)
defer func() {
if err := iter.Close(); err != nil {
log.Error("Sink worker fails to close iterator",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Error(err))
}
}()

// 1. We have enough memory to collect events.
// 2. The task is not canceled.
for advancer.hasEnoughMem() && !task.isCanceled() {
Expand Down
95 changes: 95 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/memquota"
Expand Down Expand Up @@ -666,3 +667,97 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime()
require.Equal(suite.T(), uint64(5), batchID.Load(), "The batchID should be 5, "+
"because the first task has 3 events, the second task has 1 event")
}

func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
// Only for three events.
eventSize := uint64(testEventSize * 3)
w, e := suite.createWorker(ctx, eventSize, true)
w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024)
defer w.sinkMemQuota.Close()
suite.addEventsToSortEngine(events, e)

_ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache")
}()

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
span: suite.testSpan,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return false },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

// When starts to handle a task, advancer.lastPos should be set to a correct position.
// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an
// invalid `advancer.lastPos`.
func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
w, e := suite.createWorker(ctx, 0, true)
defer w.sinkMemQuota.Close()
suite.addEventsToSortEngine(events, e)

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos)
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
span: suite.testSpan,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

0 comments on commit 85dcc86

Please sign in to comment.