Skip to content

Commit

Permalink
sink(ticdc): remove force consume when redo is disabled (pingcap#5712)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jul 6, 2022
1 parent 41ab14c commit 800d58c
Show file tree
Hide file tree
Showing 23 changed files with 647 additions and 261 deletions.
66 changes: 49 additions & 17 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@

package model

import (
"math"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Mode ResolvedMode
StartTs uint64
CRTs uint64
Resolved *ResolvedTs

RawKV *RawKVEntry
Row *RowChangedEvent
Expand Down Expand Up @@ -66,11 +71,6 @@ func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Mode == BatchResolvedMode
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
Expand Down Expand Up @@ -108,16 +108,48 @@ const (

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Mode ResolvedMode
Mode ResolvedMode
Ts uint64
BatchID uint64
}

// NewResolvedTs creates a new ResolvedTs.
// NewResolvedTs creates a normal ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Mode: NormalResolvedMode}
return ResolvedTs{Ts: t, Mode: NormalResolvedMode, BatchID: math.MaxUint64}
}

// IsBatchMode returns true if the resolved ts is BatchResolvedMode.
func (r ResolvedTs) IsBatchMode() bool {
return r.Mode == BatchResolvedMode
}

// ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events
// whose commitTs is less than or equal to `ts` are sent to Sink.
func (r ResolvedTs) ResolvedMark() uint64 {
switch r.Mode {
case NormalResolvedMode:
// with NormalResolvedMode, cdc guarantees all events whose commitTs is
// less than or equal to `resolved.Ts` are sent to Sink.
return r.Ts
case BatchResolvedMode:
// with BatchResolvedMode, cdc guarantees all events whose commitTs is
// less than `resolved.Ts` are sent to Sink.
return r.Ts - 1
default:
log.Error("unknown resolved mode", zap.Any("resolved", r))
return 0
}
}

// EqualOrGreater judge whether the resolved ts is equal or greater than the given ts.
func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool {
if r.Ts == r1.Ts {
return r.BatchID >= r1.BatchID
}
return r.Ts > r1.Ts
}

// NewResolvedTsWithMode creates a ResolvedTs with a given batch type.
func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs {
return ResolvedTs{Ts: t, Mode: m}
// Less judge whether the resolved ts is less than the given ts.
func (r ResolvedTs) Less(r1 ResolvedTs) bool {
return !r.EqualOrGreater(r1)
}
39 changes: 39 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,3 +45,41 @@ func TestPolymorphicEvent(t *testing.T) {
require.Equal(t, resolved.CRTs, polyEvent.CRTs)
require.Equal(t, uint64(0), polyEvent.StartTs)
}

func TestResolvedTs(t *testing.T) {
t.Parallel()

invalidResolvedTs := ResolvedTs{Mode: -1, Ts: 1}
require.Equal(t, uint64(0), invalidResolvedTs.ResolvedMark())

ts := rand.Uint64()%10 + 1
batchID := rand.Uint64()%10 + 1
normalResolvedTs := NewResolvedTs(ts)
batchResolvedTs1 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs1))
require.False(t, batchResolvedTs1.EqualOrGreater(normalResolvedTs))
require.False(t, normalResolvedTs.Less(batchResolvedTs1))
require.True(t, batchResolvedTs1.Less(normalResolvedTs))

batchResolvedTs2 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID + 1}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs2))
require.True(t, batchResolvedTs2.EqualOrGreater(batchResolvedTs1))
require.True(t, batchResolvedTs2.Less(normalResolvedTs))
require.True(t, batchResolvedTs1.Less(batchResolvedTs2))

largerTs := ts + rand.Uint64()%10 + 1
largerResolvedTs := NewResolvedTs(largerTs)
require.True(t, largerResolvedTs.EqualOrGreater(normalResolvedTs))
largerBatchResolvedTs := ResolvedTs{
Mode: BatchResolvedMode,
Ts: largerTs,
BatchID: batchID,
}
require.True(t, largerBatchResolvedTs.EqualOrGreater(normalResolvedTs),
"largerBatchResolvedTs:%+v\nnormalResolvedTs:%+v", largerBatchResolvedTs, normalResolvedTs)

smallerResolvedTs := NewResolvedTs(0)
require.True(t, normalResolvedTs.EqualOrGreater(smallerResolvedTs))
smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID}
require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs))
}
60 changes: 37 additions & 23 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type sinkNode struct {

// atomic oprations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
checkpointTs atomic.Value
targetTs model.Ts
barrierTs model.Ts

Expand All @@ -85,23 +85,31 @@ type sinkNode struct {

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

flowController: flowController,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
return sn
}

func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() }
func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) getResolvedTs() model.ResolvedTs {
return n.resolvedTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) getCheckpointTs() model.ResolvedTs {
return n.checkpointTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand Down Expand Up @@ -137,39 +145,39 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
n.status.Store(TableStatusStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
if n.CheckpointTs() >= n.targetTs {
err = n.stop(ctx)
}
}()
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved.Ts = n.targetTs
resolved = model.NewResolvedTs(n.targetTs)
}
if resolved.Ts <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)
n.flowController.Release(checkpoint)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(checkpoint) {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)
n.checkpointTs.Store(checkpoint)

return nil
}
Expand Down Expand Up @@ -301,7 +309,13 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode)
var resolved model.ResolvedTs
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
Expand All @@ -312,7 +326,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -331,7 +345,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
24 changes: 12 additions & 12 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ type mockFlowController struct{}
func (c *mockFlowController) Consume(
msg *model.PolymorphicEvent,
size uint64,
blockCallBack func(bool) error,
blockCallBack func(uint64) error,
) error {
return nil
}

func (c *mockFlowController) Release(resolvedTs uint64) {
func (c *mockFlowController) Release(resolved model.ResolvedTs) {
}

func (c *mockFlowController) Abort() {
Expand Down Expand Up @@ -78,12 +78,12 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error

func (s *mockSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
) (model.ResolvedTs, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolved.Ts})
return resolved.Ts, nil
return resolved, nil
}

func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 1},
})
sink.Reset()
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs())
require.Equal(t, uint64(1), node.CheckpointTs())

require.Nil(t, node.Receive(
Expand All @@ -435,7 +435,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 2},
})
sink.Reset()
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs())
require.Equal(t, uint64(2), node.CheckpointTs())
}

Expand Down Expand Up @@ -636,7 +636,7 @@ type flushFlowController struct {
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
func (c *flushFlowController) Release(resolved model.ResolvedTs) {
c.releaseCounter++
}

Expand All @@ -650,11 +650,11 @@ var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
) (model.ResolvedTs, error) {
if resolved.Ts == fallBackResolvedTs {
return 0, nil
return model.NewResolvedTs(0), nil
}
return resolved.Ts, nil
return resolved, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
Expand All @@ -680,11 +680,11 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {

err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, uint64(8), sNode.CheckpointTs())
require.Equal(t, 1, flowController.releaseCounter)
// resolvedTs will fall back in this call
err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, uint64(8), sNode.CheckpointTs())
require.Equal(t, 2, flowController.releaseCounter)
}
Loading

0 comments on commit 800d58c

Please sign in to comment.