Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): remove force consume when redo is disabled #5712

Merged
merged 8 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@

package model

import (
"math"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Mode ResolvedMode
StartTs uint64
CRTs uint64
Resolved *ResolvedTs

RawKV *RawKVEntry
Row *RowChangedEvent
Expand Down Expand Up @@ -66,11 +71,6 @@ func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Mode == BatchResolvedMode
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
Expand Down Expand Up @@ -108,16 +108,48 @@ const (

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Mode ResolvedMode
Mode ResolvedMode
Ts uint64
BatchID uint64
}

// NewResolvedTs creates a new ResolvedTs.
// NewResolvedTs creates a normal ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Mode: NormalResolvedMode}
return ResolvedTs{Ts: t, Mode: NormalResolvedMode, BatchID: math.MaxUint64}
}

// IsBatchMode returns true if the resolved ts is BatchResolvedMode.
func (r ResolvedTs) IsBatchMode() bool {
return r.Mode == BatchResolvedMode
}

// ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events
// whose commitTs is less than or equal to `ts` are sent to Sink.
func (r ResolvedTs) ResolvedMark() uint64 {
switch r.Mode {
case NormalResolvedMode:
// with NormalResolvedMode, cdc guarantees all events whose commitTs is
// less than or equal to `resolved.Ts` are sent to Sink.
return r.Ts
case BatchResolvedMode:
// with BatchResolvedMode, cdc guarantees all events whose commitTs is
// less than `resolved.Ts` are sent to Sink.
return r.Ts - 1
default:
log.Error("unknown resolved mode", zap.Any("resolved", r))
return 0
}
}

// EqualOrGreater judge whether the resolved ts is equal or greater than the given ts.
func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool {
if r.Ts == r1.Ts {
return r.BatchID >= r1.BatchID
}
return r.Ts > r1.Ts
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewResolvedTsWithMode creates a ResolvedTs with a given batch type.
func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs {
return ResolvedTs{Ts: t, Mode: m}
// Less judge whether the resolved ts is less than the given ts.
func (r ResolvedTs) Less(r1 ResolvedTs) bool {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
return !r.EqualOrGreater(r1)
}
37 changes: 37 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,3 +45,39 @@ func TestPolymorphicEvent(t *testing.T) {
require.Equal(t, resolved.CRTs, polyEvent.CRTs)
require.Equal(t, uint64(0), polyEvent.StartTs)
}

func TestResolvedTs(t *testing.T) {
t.Parallel()

invalidResolvedTs := ResolvedTs{Mode: -1, Ts: 1}
require.Equal(t, uint64(0), invalidResolvedTs.ResolvedMark())

ts := rand.Uint64()
batchID := rand.Uint64()
normalResolvedTs := NewResolvedTs(ts)
batchResolvedTs1 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs1))
require.False(t, batchResolvedTs1.EqualOrGreater(normalResolvedTs))
require.False(t, normalResolvedTs.Less(batchResolvedTs1))
require.True(t, batchResolvedTs1.Less(normalResolvedTs))

batchResolvedTs2 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID + 1}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs2))
require.True(t, batchResolvedTs2.EqualOrGreater(batchResolvedTs1))
require.True(t, batchResolvedTs2.Less(normalResolvedTs))
require.True(t, batchResolvedTs1.Less(batchResolvedTs2))

largerResolvedTs := NewResolvedTs(ts + rand.Uint64()%10)
require.True(t, largerResolvedTs.EqualOrGreater(normalResolvedTs))
largerBatchResolvedTs := ResolvedTs{
Mode: BatchResolvedMode,
Ts: ts + rand.Uint64()%10,
BatchID: batchID,
}
require.True(t, largerBatchResolvedTs.EqualOrGreater(normalResolvedTs))

smallerResolvedTs := NewResolvedTs(0)
require.True(t, normalResolvedTs.EqualOrGreater(smallerResolvedTs))
smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID}
require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs))
}
60 changes: 37 additions & 23 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type sinkNode struct {

// atomic oprations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
checkpointTs atomic.Value
targetTs model.Ts
barrierTs model.Ts

Expand All @@ -83,23 +83,31 @@ type sinkNode struct {

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

flowController: flowController,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
return sn
}

func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() }
func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) getResolvedTs() model.ResolvedTs {
return n.resolvedTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) getCheckpointTs() model.ResolvedTs {
return n.checkpointTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
n.replicaConfig = replicaConfig
Expand Down Expand Up @@ -128,39 +136,39 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
n.status.Store(TableStatusStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
if n.CheckpointTs() >= n.targetTs {
err = n.stop(ctx)
}
}()
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved.Ts = n.targetTs
resolved = model.NewResolvedTs(n.targetTs)
}
if resolved.Ts <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)
n.flowController.Release(checkpoint)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(checkpoint) {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)
n.checkpointTs.Store(checkpoint)

return nil
}
Expand Down Expand Up @@ -286,7 +294,13 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode)
var resolved model.ResolvedTs
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
Expand All @@ -297,7 +311,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -316,7 +330,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
24 changes: 12 additions & 12 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ type mockFlowController struct{}
func (c *mockFlowController) Consume(
msg *model.PolymorphicEvent,
size uint64,
blockCallBack func(bool) error,
blockCallBack func(uint64) error,
) error {
return nil
}

func (c *mockFlowController) Release(resolvedTs uint64) {
func (c *mockFlowController) Release(resolved model.ResolvedTs) {
}

func (c *mockFlowController) Abort() {
Expand Down Expand Up @@ -78,12 +78,12 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error

func (s *mockSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
) (model.ResolvedTs, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolved.Ts})
return resolved.Ts, nil
return resolved, nil
}

func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 1},
})
sink.Reset()
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs())
require.Equal(t, uint64(1), node.CheckpointTs())

msg = pmessage.BarrierMessage(5)
Expand All @@ -473,7 +473,7 @@ func TestManyTs(t *testing.T) {
{resolvedTs: 2},
})
sink.Reset()
require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs())
require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs())
require.Equal(t, uint64(2), node.CheckpointTs())
}

Expand Down Expand Up @@ -686,7 +686,7 @@ type flushFlowController struct {
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
func (c *flushFlowController) Release(resolved model.ResolvedTs) {
c.releaseCounter++
}

Expand All @@ -700,11 +700,11 @@ var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(
ctx context.Context, _ model.TableID, resolved model.ResolvedTs,
) (uint64, error) {
) (model.ResolvedTs, error) {
if resolved.Ts == fallBackResolvedTs {
return 0, nil
return model.NewResolvedTs(0), nil
}
return resolved.Ts, nil
return resolved, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
Expand All @@ -731,11 +731,11 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {

err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, uint64(8), sNode.CheckpointTs())
require.Equal(t, 1, flowController.releaseCounter)
// resolvedTs will fall back in this call
err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10)))
require.Nil(t, err)
require.Equal(t, uint64(8), sNode.checkpointTs)
require.Equal(t, uint64(8), sNode.CheckpointTs())
require.Equal(t, 2, flowController.releaseCounter)
}
Loading