Skip to content

Commit

Permalink
sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNode (
Browse files Browse the repository at this point in the history
#4084) (#4097)

close #4055
  • Loading branch information
ti-chi-bot authored Jan 12, 2022
1 parent d091d4f commit aee4975
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 6 deletions.
11 changes: 10 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= n.checkpointTs {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,61 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2)
c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0)
}

type flushFlowController struct {
mockFlowController
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
c.releaseCounter++
}

type flushSink struct {
mockSink
}

// use to simulate the situation that resolvedTs return from sink manager
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
// call flowController.Release to release the memory quota of the table to avoid
// deadlock if there is no error occur
func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-flushSink",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
},
})
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
sNode.barrierTs = 10

cctx := pipeline.MockNodeContext4Test(nil, pipeline.TickMessage(), nil)
err := sNode.flushSink(cctx, uint64(8))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 1)
// resolvedTs will fall back in this call
err = sNode.flushSink(cctx, uint64(10))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 2)
}
5 changes: 1 addition & 4 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {
return atomic.LoadUint64(&m.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs)
if m.backendSink != nil {
Expand Down Expand Up @@ -235,10 +236,6 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab
return ckpt, err
}

func (t *tableSink) getEmittedTs() uint64 {
return atomic.LoadUint64(&t.emittedTs)
}

func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
// the table sink doesn't receive the checkpoint event
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outp
return Message{}, nil
}

// MockNodeContext4Test creates a node context with a message and a output channel for tests.
// MockNodeContext4Test creates a node context with a message and an output channel for tests.
func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return newNodeContext(ctx, msg, outputCh)
}

0 comments on commit aee4975

Please sign in to comment.