From eaaa7aba62f7bda2262ac96f32b8d146fd488a28 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 1 Jul 2022 11:54:44 +0800 Subject: [PATCH] use splitTxn in sinkNode --- cdc/processor/pipeline/sink.go | 32 +++++--- cdc/processor/pipeline/sink_test.go | 91 ++++++++++++++++------ cdc/processor/pipeline/table_actor.go | 2 +- cdc/processor/pipeline/table_actor_test.go | 2 +- pkg/config/sink.go | 14 ++-- 5 files changed, 93 insertions(+), 48 deletions(-) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index b8eb72fd677..dbb97d68323 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -15,6 +15,7 @@ package pipeline import ( "context" + "fmt" "sync/atomic" "time" @@ -47,6 +48,7 @@ type sinkNode struct { redoManager redo.LogManager replicaConfig *config.ReplicaConfig + splitTxn bool } func newSinkNode( @@ -56,6 +58,7 @@ func newSinkNode( redoManager redo.LogManager, state *TableState, changefeed model.ChangeFeedID, + splitTxn bool, ) *sinkNode { sn := &sinkNode{ tableID: tableID, @@ -66,6 +69,7 @@ func newSinkNode( changefeed: changefeed, flowController: flowController, redoManager: redoManager, + splitTxn: splitTxn, } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) sn.checkpointTs.Store(model.NewResolvedTs(startTs)) @@ -307,7 +311,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent - n.checkSplitTxn(event) + if err := n.verifySplitTxn(event); err != nil { + return false, errors.Trace(err) + } if event.IsResolved() { if n.state.Load() == TableStatePrepared { @@ -363,22 +369,24 @@ func (n *sinkNode) releaseResource(ctx context.Context) error { return n.sink.Close(ctx) } -func (n *sinkNode) checkSplitTxn(e *model.PolymorphicEvent) { - ta := n.replicaConfig.Sink.TxnAtomicity - ta.Validate() - if ta.ShouldSplitTxn() { - return +// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent +// with `SplitTxn==true`. +func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error { + if n.splitTxn { + return nil } - // Check that BatchResolved events and RowChangedEvent events with `SplitTxn==true` - // have not been received by sinkNode. + // Fail-fast check, this situation should never happen normally when split transactions + // are not supported. if e.Resolved != nil && e.Resolved.IsBatchMode() { - log.Panic("batch mode resolved ts is not supported when sink.splitTxn is false", - zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig)) + msg := fmt.Sprintf("batch mode resolved ts is not supported "+ + "when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) } if e.Row != nil && e.Row.SplitTxn { - log.Panic("should not split txn when sink.splitTxn is false", - zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig)) + msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) } + return nil } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 7f6705f9f1f..05fb1c2de98 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -128,8 +128,6 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error { func TestState(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) config := config.GetDefaultReplicaConfig() - // tableTxnAtomicity - config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-status"), Info: &model.ChangeFeedInfo{ @@ -141,7 +139,7 @@ func TestState(t *testing.T) { state := TableStatePrepared // test stop at targetTs node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -192,7 +190,7 @@ func TestState(t *testing.T) { // test the stop at ts command state = TableStatePrepared node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -232,7 +230,7 @@ func TestState(t *testing.T) { // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts state = TableStatePrepared node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -274,8 +272,6 @@ func TestState(t *testing.T) { func TestStopStatus(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) config := config.GetDefaultReplicaConfig() - // tableTxnAtomicity - config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-state"), Info: &model.ChangeFeedInfo{ @@ -289,7 +285,7 @@ func TestStopStatus(t *testing.T) { node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -324,8 +320,6 @@ func TestStopStatus(t *testing.T) { func TestManyTs(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) config := config.GetDefaultReplicaConfig() - // tableTxnAtomicity - config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ @@ -336,7 +330,7 @@ func TestManyTs(t *testing.T) { state := TableStatePrepared sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -502,8 +496,6 @@ func TestManyTs(t *testing.T) { func TestIgnoreEmptyRowChangeEvent(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) config := config.GetDefaultReplicaConfig() - // tableTxnAtomicity - config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ @@ -514,7 +506,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { state := TableStatePreparing sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -532,8 +524,6 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) config := config.GetDefaultReplicaConfig() - // tableTxnAtomicity - config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ @@ -544,7 +534,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { state := TableStatePreparing sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -598,8 +588,6 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) config := config.GetDefaultReplicaConfig() config.EnableOldValue = false - // tableTxnAtomicity - config.Sink.TxnAtomicity = "none" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ @@ -610,7 +598,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { state := TableStatePreparing sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -750,14 +738,11 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) cfg := config.GetDefaultReplicaConfig() cfg.EnableOldValue = false - config := config.GetDefaultReplicaConfig() - // tableTxnAtomicity - config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config, + Config: cfg, }, }) state := TableStatePreparing @@ -765,7 +750,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { sink := &flushSink{} // sNode is a sinkNode sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) sNode.barrierTs = 10 @@ -780,3 +765,59 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { require.Equal(t, uint64(8), sNode.CheckpointTs()) require.Equal(t, 2, flowController.releaseCounter) } + +func TestSplitTxn(t *testing.T) { + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: model.DefaultChangeFeedID("changefeed-id-test"), + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config, + }, + }) + state := TableStatePrepared + flowController := &flushFlowController{} + sink := &flushSink{} + // sNode is a sinkNode + sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + _, err := sNode.HandleMessage(ctx, msg) + require.Nil(t, err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 2, SplitTxn: true}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Regexp(t, ".*should not split txn when sink.splitTxn is.*", err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 2}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Nil(t, err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 7, + Resolved: &model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: 7, + BatchID: 1, + }, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Regexp(t, ".*batch mode resolved ts is not supported.*", err) +} diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index cac35705527..57d16653a57 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -321,7 +321,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, - t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID) + t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID, splitTxn) actorSinkNode.initWithReplicaConfig(t.replicaConfig) t.sinkNode = actorSinkNode diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 3aaf1271951..c4be1425c1b 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -53,7 +53,7 @@ func TestAsyncStopFailed(t *testing.T) { state: TableStatePreparing, } tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager, - &tbl.state, model.DefaultChangeFeedID("changefeed-test")) + &tbl.state, model.DefaultChangeFeedID("changefeed-test"), false) require.True(t, tbl.AsyncStop(1)) mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 78c65eab7fe..30964527e30 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -53,13 +53,6 @@ func (l AtomicityLevel) ShouldSplitTxn() bool { return l == noneTxnAtomicity } -// Validate checks the AtomicityLevel is supported by TiCDC. -func (l AtomicityLevel) Validate() { - if l != noneTxnAtomicity && l != tableTxnAtomicity { - log.Panic(fmt.Sprintf("unsupported transaction atomicity: %s", l)) - } -} - // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var ForceEnableOldValueProtocols = []string{ ProtocolCanal.String(), @@ -171,10 +164,13 @@ func (s *SinkConfig) applyParameter(sinkURI *url.URL) error { return err } } else if s.Protocol != "" { - return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot be configured "+ - "when using %s scheme", sinkURI.Scheme)) + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot "+ + "be configured when using %s scheme", sinkURI.Scheme)) } + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) return nil }