Skip to content

Commit

Permalink
sink(ticdc): add transaction-atomicity parameter to SinkURI (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jul 6, 2022
1 parent 8da7e8b commit 1f095a2
Show file tree
Hide file tree
Showing 26 changed files with 424 additions and 99 deletions.
1 change: 1 addition & 0 deletions cdc/api/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
oldInfo.Config.Sink.TxnAtomicity = "table"
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
Expand Down
38 changes: 37 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -81,9 +82,17 @@ type sinkNode struct {

replicaConfig *config.ReplicaConfig
isTableActorMode bool
splitTxn bool
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(
tableID model.TableID,
sink sink.Sink,
startTs model.Ts,
targetTs model.Ts,
flowController tableFlowController,
splitTxn bool,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
Expand All @@ -92,6 +101,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
barrierTs: startTs,

flowController: flowController,
splitTxn: splitTxn,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -301,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
Expand Down Expand Up @@ -360,3 +374,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
n.flowController.Abort()
return n.sink.Close(ctx)
}

// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent
// with `SplitTxn==true`.
func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error {
if n.splitTxn {
return nil
}

// Fail-fast check, this situation should never happen normally when split transactions
// are not supported.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
msg := fmt.Sprintf("batch mode resolved ts is not supported "+
"when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}

if e.Row != nil && e.Row.SplitTxn {
msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}
return nil
}
18 changes: 9 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestStatus(t *testing.T) {
})

// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -175,7 +175,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -206,7 +206,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -249,7 +249,7 @@ func TestStopStatus(t *testing.T) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -287,7 +287,7 @@ func TestManyTs(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -449,7 +449,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// empty row, no Columns and PreColumns.
Expand All @@ -471,7 +471,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// nil row.
Expand Down Expand Up @@ -529,7 +529,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// nil row.
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
sNode := newSinkNode(1, sink, 0, 10, flowController, false)
require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
sNode.barrierTs = 10

Expand Down
7 changes: 5 additions & 2 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func NewTablePipeline(ctx cdcContext.Context,
zap.String("tableName", tableName),
zap.Int64("tableID", tableID),
zap.Uint64("quota", perTableMemoryQuota))
flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota, redoLogEnabled)
splitTxn := replConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota,
redoLogEnabled, splitTxn)
config := ctx.ChangefeedVars().Info.Config
cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled()
runnerSize := defaultRunnersSize
Expand All @@ -215,7 +218,7 @@ func NewTablePipeline(ctx cdcContext.Context,
p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs,
flowController, mounter, replConfig)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController, splitTxn)

p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName,
changefeed, upstream))
Expand Down
39 changes: 21 additions & 18 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type tableActor struct {
// backend mounter
mounter entry.Mounter
// backend tableSink
tableSink sink.Sink
redoLogEnabled bool
tableSink sink.Sink
redoManager redo.LogManager

pullerNode *pullerNode
sortNode *sorterNode
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewTableActor(cdcCtx cdcContext.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
redoLogEnabled bool,
redoManager redo.LogManager,
targetTs model.Ts,
) (TablePipeline, error) {
config := cdcCtx.ChangefeedVars().Info.Config
Expand All @@ -126,19 +126,19 @@ func NewTableActor(cdcCtx cdcContext.Context,
wg: wg,
cancel: cancel,

tableID: tableID,
markTableID: replicaInfo.MarkTableID,
tableName: tableName,
cyclicEnabled: cyclicEnabled,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upStream: upStream,
mounter: mounter,
replicaInfo: replicaInfo,
replicaConfig: config,
tableSink: sink,
redoLogEnabled: redoLogEnabled,
targetTs: targetTs,
started: false,
tableID: tableID,
markTableID: replicaInfo.MarkTableID,
tableName: tableName,
cyclicEnabled: cyclicEnabled,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upStream: upStream,
mounter: mounter,
replicaInfo: replicaInfo,
replicaConfig: config,
tableSink: sink,
redoManager: redoManager,
targetTs: targetTs,
started: false,

changefeedID: changefeedVars.ID,
changefeedVars: changefeedVars,
Expand Down Expand Up @@ -282,7 +282,10 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled)
splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(t.memoryQuota,
t.redoManager.Enabled(), splitTxn)
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig,
Expand Down Expand Up @@ -321,7 +324,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

actorSinkNode := newSinkNode(t.tableID, t.tableSink,
t.replicaInfo.StartTs,
t.targetTs, flowController)
t.targetTs, flowController, splitTxn)
actorSinkNode.initWithReplicaConfig(true, t.replicaConfig)
t.sinkNode = actorSinkNode

Expand Down
10 changes: 7 additions & 3 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAsyncStopFailed(t *testing.T) {
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}),
sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false),
}
require.True(t, tbl.AsyncStop(1))

Expand Down Expand Up @@ -358,7 +358,7 @@ func TestNewTableActor(t *testing.T) {
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
}, &mockSink{}, false, 10)
}, &mockSink{}, redo.NewDisabledManager(), 10)
require.NotNil(t, tbl)
require.Nil(t, err)
require.NotPanics(t, func() {
Expand All @@ -374,7 +374,7 @@ func TestNewTableActor(t *testing.T) {
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
}, &mockSink{}, false, 10)
}, &mockSink{}, redo.NewDisabledManager(), 10)
require.Nil(t, tbl)
require.NotNil(t, err)

Expand Down Expand Up @@ -414,6 +414,8 @@ func TestTableActorStart(t *testing.T) {
StartTs: 0,
MarkTableID: 1,
},
redoManager: redo.NewDisabledManager(),
replicaConfig: config.GetDefaultReplicaConfig(),
}
require.Nil(t, tbl.start(ctx))
require.Equal(t, 1, len(tbl.nodes))
Expand All @@ -427,10 +429,12 @@ func TestTableActorStart(t *testing.T) {
Config: config.GetDefaultReplicaConfig(),
},
},
redoManager: redo.NewDisabledManager(),
replicaInfo: &model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
},
replicaConfig: config.GetDefaultReplicaConfig(),
}
tbl.cyclicEnabled = true
require.Nil(t, tbl.start(ctx))
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (p *processor) createTablePipelineImpl(
tableName,
replicaInfo,
s,
p.redoManager.Enabled(),
p.redoManager,
p.changefeed.Info.GetTargetTs())
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 1f095a2

Please sign in to comment.