Skip to content

Commit

Permalink
use splitTxn in sinkNode
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jul 1, 2022
1 parent 4e1b956 commit eaaa7ab
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 48 deletions.
32 changes: 20 additions & 12 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ type sinkNode struct {
redoManager redo.LogManager

replicaConfig *config.ReplicaConfig
splitTxn bool
}

func newSinkNode(
Expand All @@ -56,6 +58,7 @@ func newSinkNode(
redoManager redo.LogManager,
state *TableState,
changefeed model.ChangeFeedID,
splitTxn bool,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
Expand All @@ -66,6 +69,7 @@ func newSinkNode(
changefeed: changefeed,
flowController: flowController,
redoManager: redoManager,
splitTxn: splitTxn,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -307,7 +311,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
n.checkSplitTxn(event)
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.state.Load() == TableStatePrepared {
Expand Down Expand Up @@ -363,22 +369,24 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
return n.sink.Close(ctx)
}

func (n *sinkNode) checkSplitTxn(e *model.PolymorphicEvent) {
ta := n.replicaConfig.Sink.TxnAtomicity
ta.Validate()
if ta.ShouldSplitTxn() {
return
// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent
// with `SplitTxn==true`.
func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error {
if n.splitTxn {
return nil
}

// Check that BatchResolved events and RowChangedEvent events with `SplitTxn==true`
// have not been received by sinkNode.
// Fail-fast check, this situation should never happen normally when split transactions
// are not supported.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
log.Panic("batch mode resolved ts is not supported when sink.splitTxn is false",
zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig))
msg := fmt.Sprintf("batch mode resolved ts is not supported "+
"when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}

if e.Row != nil && e.Row.SplitTxn {
log.Panic("should not split txn when sink.splitTxn is false",
zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig))
msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}
return nil
}
91 changes: 66 additions & 25 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error {
func TestState(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-status"),
Info: &model.ChangeFeedInfo{
Expand All @@ -141,7 +139,7 @@ func TestState(t *testing.T) {
state := TableStatePrepared
// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -192,7 +190,7 @@ func TestState(t *testing.T) {
// test the stop at ts command
state = TableStatePrepared
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -232,7 +230,7 @@ func TestState(t *testing.T) {
// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
state = TableStatePrepared
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -274,8 +272,6 @@ func TestState(t *testing.T) {
func TestStopStatus(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-state"),
Info: &model.ChangeFeedInfo{
Expand All @@ -289,7 +285,7 @@ func TestStopStatus(t *testing.T) {
node := newSinkNode(1,
&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100,
&mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -324,8 +320,6 @@ func TestStopStatus(t *testing.T) {
func TestManyTs(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
Expand All @@ -336,7 +330,7 @@ func TestManyTs(t *testing.T) {
state := TableStatePrepared
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -502,8 +496,6 @@ func TestManyTs(t *testing.T) {
func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
Expand All @@ -514,7 +506,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand All @@ -532,8 +524,6 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
Expand All @@ -544,7 +534,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -598,8 +588,6 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
config.EnableOldValue = false
// tableTxnAtomicity
config.Sink.TxnAtomicity = "none"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
Expand All @@ -610,7 +598,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -750,22 +738,19 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config,
Config: cfg,
},
})
state := TableStatePreparing
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
sNode.barrierTs = 10
Expand All @@ -780,3 +765,59 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
require.Equal(t, uint64(8), sNode.CheckpointTs())
require.Equal(t, 2, flowController.releaseCounter)
}

func TestSplitTxn(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config,
},
})
state := TableStatePrepared
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID, false)
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
_, err := sNode.HandleMessage(ctx, msg)
require.Nil(t, err)

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{CommitTs: 2, SplitTxn: true},
})
_, err = sNode.HandleMessage(ctx, msg)
require.Regexp(t, ".*should not split txn when sink.splitTxn is.*", err)

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{CommitTs: 2},
})
_, err = sNode.HandleMessage(ctx, msg)
require.Nil(t, err)

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7,
Resolved: &model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: 7,
BatchID: 1,
},
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
_, err = sNode.HandleMessage(ctx, msg)
require.Regexp(t, ".*batch mode resolved ts is not supported.*", err)
}
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

actorSinkNode := newSinkNode(t.tableID, t.tableSink,
t.replicaInfo.StartTs,
t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID)
t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID, splitTxn)
actorSinkNode.initWithReplicaConfig(t.replicaConfig)
t.sinkNode = actorSinkNode

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestAsyncStopFailed(t *testing.T) {
state: TableStatePreparing,
}
tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager,
&tbl.state, model.DefaultChangeFeedID("changefeed-test"))
&tbl.state, model.DefaultChangeFeedID("changefeed-test"), false)
require.True(t, tbl.AsyncStop(1))

mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
Expand Down
14 changes: 5 additions & 9 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@ func (l AtomicityLevel) ShouldSplitTxn() bool {
return l == noneTxnAtomicity
}

// Validate checks the AtomicityLevel is supported by TiCDC.
func (l AtomicityLevel) Validate() {
if l != noneTxnAtomicity && l != tableTxnAtomicity {
log.Panic(fmt.Sprintf("unsupported transaction atomicity: %s", l))
}
}

// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var ForceEnableOldValueProtocols = []string{
ProtocolCanal.String(),
Expand Down Expand Up @@ -171,10 +164,13 @@ func (s *SinkConfig) applyParameter(sinkURI *url.URL) error {
return err
}
} else if s.Protocol != "" {
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot be configured "+
"when using %s scheme", sinkURI.Scheme))
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot "+
"be configured when using %s scheme", sinkURI.Scheme))
}

log.Info("succeed to parse parameter from sink uri",
zap.String("protocol", s.Protocol),
zap.String("txnAtomicity", string(s.TxnAtomicity)))
return nil
}

Expand Down

0 comments on commit eaaa7ab

Please sign in to comment.