Skip to content

Commit

Permalink
sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNode (
Browse files Browse the repository at this point in the history
#4084) (#4099)

close #4055
  • Loading branch information
ti-chi-bot authored Jan 13, 2022
1 parent cabee2c commit 181bb6c
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 4 deletions.
11 changes: 10 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= n.checkpointTs {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,61 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2)
c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0)
}

type flushFlowController struct {
mockFlowController
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
c.releaseCounter++
}

type flushSink struct {
mockSink
}

// use to simulate the situation that resolvedTs return from sink manager
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
// call flowController.Release to release the memory quota of the table to avoid
// deadlock if there is no error occur
func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-flushSink",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
},
})
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
sNode.barrierTs = 10

cctx := pipeline.MockNodeContext4Test(nil, pipeline.TickMessage(), nil)
err := sNode.flushSink(cctx, uint64(8))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 1)
// resolvedTs will fall back in this call
err = sNode.flushSink(cctx, uint64(10))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 2)
}
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
}
}
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
// Otherwise, the pipeline would deadlock.
err := n.flowController.Consume(commitTs, size, func() error {
if lastCRTs > lastSentResolvedTs {
// If we are blocking, we send a Resolved Event here to elicit a sink-flush.
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {
return atomic.LoadUint64(&m.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs)
if m.backendSink != nil {
Expand Down
9 changes: 8 additions & 1 deletion cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ type tableSink struct {
redoManager redo.LogManager
}

func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

var _ Sink = (*tableSink)(nil)

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
Expand Down Expand Up @@ -127,7 +134,7 @@ func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
return nil
}

// Note once the Close is called, no more events can be written to this table sink
// Close once the method is called, no more events can be written to this table sink
func (t *tableSink) Close(ctx context.Context) error {
return t.manager.destroyTableSink(ctx, t.tableID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outp
return Message{}, nil
}

// MockNodeContext4Test creates a node context with a message and a output channel for tests.
// MockNodeContext4Test creates a node context with a message and an output channel for tests.
func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return NewNodeContext(ctx, msg, outputCh)
}

0 comments on commit 181bb6c

Please sign in to comment.