Skip to content

Commit

Permalink
sink(cdc): improve table sink advance timeout machanism (#9666)
Browse files Browse the repository at this point in the history
close #9695
  • Loading branch information
hicqu authored Sep 11, 2023
1 parent 78d59fc commit c410cff
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 58 deletions.
26 changes: 10 additions & 16 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()

// To release memory quota ASAP, close all table sinks manually.
start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
Expand Down Expand Up @@ -371,22 +372,17 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) {
func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
skipped := true
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
skipped = false
default:
}
return true
}
log.Info("Sink manager tries to put an sink error",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Bool("skipped", skipped),
zap.String("error", err.Error()))
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
Expand Down Expand Up @@ -434,7 +430,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
if time.Since(sink.lastCleanTime) < cleanTableInterval {
return true
}
checkpointTs, _, _ := sink.getCheckpointTs()
checkpointTs := sink.getCheckpointTs()
resolvedMark := checkpointTs.ResolvedMark()
if resolvedMark == 0 {
return true
Expand Down Expand Up @@ -906,7 +902,7 @@ func (m *SinkManager) RemoveTable(span tablepb.Span) {
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
}
checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs()
checkpointTs := value.(*tableSinkWrapper).getCheckpointTs()
log.Info("Remove table sink successfully",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down Expand Up @@ -975,27 +971,25 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
tableSink := value.(*tableSinkWrapper)

checkpointTs, version, advanced := tableSink.getCheckpointTs()
checkpointTs := tableSink.getCheckpointTs()
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

advanceTimeoutInSec := util.GetOrZero(m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec)
if advanceTimeoutInSec <= 0 {
log.Warn("AdvanceTimeoutInSec is not set, use default value", zap.Any("sinkConfig", m.changefeedInfo.Config.Sink))
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second
if version > 0 && time.Since(advanced) > stuckCheck &&
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
tableSink.updateTableSinkAdvanced()
m.putSinkFactoryError(errors.New("table sink stuck"), version)
}

var resolvedTs model.Ts
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 4
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 3
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
require.NotNil(t, tableSink)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
require.Equal(t, uint64(1), checkpointTS.Ts)
}

Expand Down
24 changes: 12 additions & 12 deletions cdc/processor/sinkmanager/table_sink_advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTableSinkWithBatchID() {
expectedResolvedTs := model.NewResolvedTs(2)
expectedResolvedTs.Mode = model.BatchResolvedMode
expectedResolvedTs.BatchID = 1
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
require.Equal(suite.T(), expectedResolvedTs, checkpointTs)
}

Expand All @@ -151,7 +151,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTableSink() {
require.NoError(suite.T(), err)

expectedResolvedTs := model.NewResolvedTs(2)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
require.Equal(suite.T(), expectedResolvedTs, checkpointTs)
}

Expand Down Expand Up @@ -290,7 +290,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTheSameCommitTsEventsWithCommitF
require.Len(suite.T(), sink.GetEvents(), 3)
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == model.NewResolvedTs(2)
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -337,7 +337,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTheSameCommitTsEventsWithoutComm
expectedResolvedTs := model.NewResolvedTs(3)
expectedResolvedTs.Mode = model.BatchResolvedMode
expectedResolvedTs.BatchID = 1
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -388,7 +388,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceDifferentCommitTsEventsWithSplit
expectedResolvedTs := model.NewResolvedTs(3)
expectedResolvedTs.Mode = model.BatchResolvedMode
expectedResolvedTs.BatchID = 1
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -443,7 +443,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceDifferentCommitTsEventsWithoutSp
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(2)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -499,7 +499,7 @@ func (suite *tableSinkAdvancerSuite) TestLastTimeAdvanceDifferentCommitTsEventsW
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(2)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -557,7 +557,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceWhenExceedAvailableMem() {
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(3)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -607,7 +607,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceWhenReachTheMaxUpdateIntSizeA
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(3)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -660,7 +660,7 @@ func (suite *tableSinkAdvancerSuite) TestFinish() {
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(4)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -710,7 +710,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceAndForceAcquireWithoutSplitTx
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(3)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down Expand Up @@ -775,7 +775,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceAndBlockAcquireWithSplitTxn()
<-down
require.Eventually(suite.T(), func() bool {
expectedResolvedTs := model.NewResolvedTs(3)
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
return checkpointTs == expectedResolvedTs
}, 5*time.Second, 10*time.Millisecond)
require.Equal(suite.T(), uint64(0), advancer.committedTxnSize)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e

// Restart the table sink based on the checkpoint position.
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
performCallback(lastWrittenPos)
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableWhen
receivedEvents := sink.GetEvents()
receivedEvents[0].Callback()
require.Len(suite.T(), sink.GetEvents(), 1, "No more events should be sent to sink")
checkpointTs, _, _ := wrapper.getCheckpointTs()
checkpointTs := wrapper.getCheckpointTs()
require.Equal(suite.T(), uint64(4), checkpointTs.ResolvedMark())
}

Expand Down Expand Up @@ -581,7 +581,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNo
isCanceled: func() bool { return false },
}
require.Eventually(suite.T(), func() bool {
checkpointTs, _, _ := wrapper.getCheckpointTs()
checkpointTs := wrapper.getCheckpointTs()
return checkpointTs.ResolvedMark() == 4
}, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4")
cancel()
Expand Down Expand Up @@ -639,7 +639,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime()
require.Equal(suite.T(), uint64(3), batchID.Load())
sink.AckAllEvents()
require.Eventually(suite.T(), func() bool {
checkpointTs, _, _ := wrapper.getCheckpointTs()
checkpointTs := wrapper.getCheckpointTs()
return checkpointTs.ResolvedMark() == 2
}, 5*time.Second, 10*time.Millisecond)

Expand Down
65 changes: 43 additions & 22 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ type tableSinkWrapper struct {
// tableSink is the underlying sink.
tableSink struct {
sync.RWMutex
s tablesink.TableSink
version uint64 // it's generated by `tableSinkCreator`.
s tablesink.TableSink
version uint64 // it's generated by `tableSinkCreater`.

innerMu sync.Mutex
advanced time.Time
resolvedTs model.ResolvedTs
checkpointTs model.ResolvedTs
advanced atomic.Int64
}

// state used to control the lifecycle of the table.
Expand Down Expand Up @@ -120,7 +123,8 @@ func newTableSinkWrapper(

res.tableSink.version = 0
res.tableSink.checkpointTs = model.NewResolvedTs(startTs)
res.updateTableSinkAdvanced()
res.tableSink.resolvedTs = model.NewResolvedTs(startTs)
res.tableSink.advanced = time.Now()

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
Expand Down Expand Up @@ -197,33 +201,28 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
// If it's nil it means it's closed.
return tablesink.NewSinkInternalError(errors.New("table sink cleared"))
}
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()
t.tableSink.resolvedTs = ts
return t.tableSink.s.UpdateResolvedTs(ts)
}

// getCheckpointTs returns
// 1. checkpoint timestamp of the table;
// 2. the table sink version, which comes from `tableSinkCreator`;
// 3. recent time of the table is advanced.
func (t *tableSinkWrapper) getCheckpointTs() (model.ResolvedTs, uint64, time.Time) {
func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs {
t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

if t.tableSink.s != nil {
checkpointTs := t.tableSink.s.GetCheckpointTs()
if t.tableSink.checkpointTs.Less(checkpointTs) {
t.tableSink.checkpointTs = checkpointTs
t.updateTableSinkAdvanced()
t.tableSink.advanced = time.Now()
} else if !checkpointTs.Less(t.tableSink.resolvedTs) {
t.tableSink.advanced = time.Now()
}
}
advanced := time.Unix(t.tableSink.advanced.Load(), 0)
return t.tableSink.checkpointTs, t.tableSink.version, advanced
}

func (t *tableSinkWrapper) updateTableSinkAdvanced() {
curr := t.tableSink.advanced.Load()
now := time.Now().Unix()
if now > curr {
t.tableSink.advanced.CompareAndSwap(curr, now)
}
return t.tableSink.checkpointTs
}

func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts {
Expand Down Expand Up @@ -296,7 +295,7 @@ func (t *tableSinkWrapper) initTableSink() bool {
if t.tableSink.s == nil {
t.tableSink.s, t.tableSink.version = t.tableSinkCreator()
if t.tableSink.s != nil {
t.updateTableSinkAdvanced()
t.tableSink.advanced = time.Now()
return true
}
return false
Expand Down Expand Up @@ -342,12 +341,15 @@ func (t *tableSinkWrapper) doTableSinkClear() {
return
}
checkpointTs := t.tableSink.s.GetCheckpointTs()
t.tableSink.innerMu.Lock()
if t.tableSink.checkpointTs.Less(checkpointTs) {
t.tableSink.checkpointTs = checkpointTs
}
t.tableSink.resolvedTs = checkpointTs
t.tableSink.advanced = time.Now()
t.tableSink.innerMu.Unlock()
t.tableSink.s = nil
t.tableSink.version = 0
t.tableSink.advanced.Store(time.Now().Unix())
}

// When the attached sink fail, there can be some events that have already been
Expand Down Expand Up @@ -417,6 +419,25 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min
return shouldClean
}

func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

// What these conditions mean:
// 1. the table sink has been associated with a valid sink;
// 2. its checkpoint hasn't been advanced for a while;
version := t.tableSink.version
advanced := t.tableSink.advanced
if version > 0 && time.Since(advanced) > stuckCheck {
return true, version
}
return false, uint64(0)
}

func handleRowChangedEvents(
changefeed model.ChangeFeedID, span tablepb.Span,
events ...*model.PolymorphicEvent,
Expand Down
Loading

0 comments on commit c410cff

Please sign in to comment.