Skip to content

Commit

Permalink
sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 6, 2022
1 parent 41ab14c commit be67a42
Show file tree
Hide file tree
Showing 38 changed files with 1,010 additions and 299 deletions.
1 change: 1 addition & 0 deletions cdc/api/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
oldInfo.Config.Sink.TxnAtomicity = "table"
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
Expand Down
66 changes: 49 additions & 17 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@

package model

import (
"math"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Mode ResolvedMode
StartTs uint64
CRTs uint64
Resolved *ResolvedTs

RawKV *RawKVEntry
Row *RowChangedEvent
Expand Down Expand Up @@ -66,11 +71,6 @@ func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Mode == BatchResolvedMode
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
Expand Down Expand Up @@ -108,16 +108,48 @@ const (

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Mode ResolvedMode
Mode ResolvedMode
Ts uint64
BatchID uint64
}

// NewResolvedTs creates a new ResolvedTs.
// NewResolvedTs creates a normal ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Mode: NormalResolvedMode}
return ResolvedTs{Ts: t, Mode: NormalResolvedMode, BatchID: math.MaxUint64}
}

// IsBatchMode returns true if the resolved ts is BatchResolvedMode.
func (r ResolvedTs) IsBatchMode() bool {
return r.Mode == BatchResolvedMode
}

// ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events
// whose commitTs is less than or equal to `ts` are sent to Sink.
func (r ResolvedTs) ResolvedMark() uint64 {
switch r.Mode {
case NormalResolvedMode:
// with NormalResolvedMode, cdc guarantees all events whose commitTs is
// less than or equal to `resolved.Ts` are sent to Sink.
return r.Ts
case BatchResolvedMode:
// with BatchResolvedMode, cdc guarantees all events whose commitTs is
// less than `resolved.Ts` are sent to Sink.
return r.Ts - 1
default:
log.Error("unknown resolved mode", zap.Any("resolved", r))
return 0
}
}

// EqualOrGreater judge whether the resolved ts is equal or greater than the given ts.
func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool {
if r.Ts == r1.Ts {
return r.BatchID >= r1.BatchID
}
return r.Ts > r1.Ts
}

// NewResolvedTsWithMode creates a ResolvedTs with a given batch type.
func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs {
return ResolvedTs{Ts: t, Mode: m}
// Less judge whether the resolved ts is less than the given ts.
func (r ResolvedTs) Less(r1 ResolvedTs) bool {
return !r.EqualOrGreater(r1)
}
39 changes: 39 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,3 +45,41 @@ func TestPolymorphicEvent(t *testing.T) {
require.Equal(t, resolved.CRTs, polyEvent.CRTs)
require.Equal(t, uint64(0), polyEvent.StartTs)
}

func TestResolvedTs(t *testing.T) {
t.Parallel()

invalidResolvedTs := ResolvedTs{Mode: -1, Ts: 1}
require.Equal(t, uint64(0), invalidResolvedTs.ResolvedMark())

ts := rand.Uint64()%10 + 1
batchID := rand.Uint64()%10 + 1
normalResolvedTs := NewResolvedTs(ts)
batchResolvedTs1 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs1))
require.False(t, batchResolvedTs1.EqualOrGreater(normalResolvedTs))
require.False(t, normalResolvedTs.Less(batchResolvedTs1))
require.True(t, batchResolvedTs1.Less(normalResolvedTs))

batchResolvedTs2 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID + 1}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs2))
require.True(t, batchResolvedTs2.EqualOrGreater(batchResolvedTs1))
require.True(t, batchResolvedTs2.Less(normalResolvedTs))
require.True(t, batchResolvedTs1.Less(batchResolvedTs2))

largerTs := ts + rand.Uint64()%10 + 1
largerResolvedTs := NewResolvedTs(largerTs)
require.True(t, largerResolvedTs.EqualOrGreater(normalResolvedTs))
largerBatchResolvedTs := ResolvedTs{
Mode: BatchResolvedMode,
Ts: largerTs,
BatchID: batchID,
}
require.True(t, largerBatchResolvedTs.EqualOrGreater(normalResolvedTs),
"largerBatchResolvedTs:%+v\nnormalResolvedTs:%+v", largerBatchResolvedTs, normalResolvedTs)

smallerResolvedTs := NewResolvedTs(0)
require.True(t, normalResolvedTs.EqualOrGreater(smallerResolvedTs))
smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID}
require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs))
}
98 changes: 74 additions & 24 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -73,35 +74,52 @@ type sinkNode struct {

// atomic oprations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
checkpointTs atomic.Value
targetTs model.Ts
barrierTs model.Ts

flowController tableFlowController

replicaConfig *config.ReplicaConfig
isTableActorMode bool
splitTxn bool
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(
tableID model.TableID,
sink sink.Sink,
startTs model.Ts,
targetTs model.Ts,
flowController tableFlowController,
splitTxn bool,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

flowController: flowController,
splitTxn: splitTxn,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
return sn
}

func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() }
func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) getResolvedTs() model.ResolvedTs {
return n.resolvedTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) getCheckpointTs() model.ResolvedTs {
return n.checkpointTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand Down Expand Up @@ -137,39 +155,39 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
n.status.Store(TableStatusStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
if n.CheckpointTs() >= n.targetTs {
err = n.stop(ctx)
}
}()
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved.Ts = n.targetTs
resolved = model.NewResolvedTs(n.targetTs)
}
if resolved.Ts <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)
n.flowController.Release(checkpoint)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(checkpoint) {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)
n.checkpointTs.Store(checkpoint)

return nil
}
Expand Down Expand Up @@ -293,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
Expand All @@ -301,7 +323,13 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode)
var resolved model.ResolvedTs
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
Expand All @@ -312,7 +340,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -331,7 +359,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return errors.Trace(err)
}
return nil
Expand All @@ -346,3 +374,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
n.flowController.Abort()
return n.sink.Close(ctx)
}

// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent
// with `SplitTxn==true`.
func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error {
if n.splitTxn {
return nil
}

// Fail-fast check, this situation should never happen normally when split transactions
// are not supported.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
msg := fmt.Sprintf("batch mode resolved ts is not supported "+
"when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}

if e.Row != nil && e.Row.SplitTxn {
msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}
return nil
}
Loading

0 comments on commit be67a42

Please sign in to comment.