diff --git a/cdc/api/validator_test.go b/cdc/api/validator_test.go index 967e7b11cdb..ae4d295c4c5 100644 --- a/cdc/api/validator_test.go +++ b/cdc/api/validator_test.go @@ -39,7 +39,12 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { // test no change error changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"} oldInfo.SinkURI = "blackhole://" +<<<<<<< HEAD:cdc/api/validator_test.go newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) +======= + oldInfo.Config.Sink.TxnAtomicity = "table" + newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)):cdc/api/validator/validator_test.go require.NotNil(t, err) require.Regexp(t, ".*changefeed config is the same with the old one.*", err) require.Nil(t, newInfo) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 37ecd559762..6a67f359e66 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -15,6 +15,7 @@ package pipeline import ( "context" + "fmt" "sync/atomic" "time" @@ -79,11 +80,27 @@ type sinkNode struct { flowController tableFlowController +<<<<<<< HEAD replicaConfig *config.ReplicaConfig isTableActorMode bool } func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +======= + replicaConfig *config.ReplicaConfig + splitTxn bool +} + +func newSinkNode( + tableID model.TableID, sink sink.Sink, + startTs model.Ts, targetTs model.Ts, + flowController tableFlowController, + redoManager redo.LogManager, + state *TableState, + changefeed model.ChangeFeedID, + splitTxn bool, +) *sinkNode { +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) sn := &sinkNode{ tableID: tableID, sink: sink, @@ -93,6 +110,11 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target barrierTs: startTs, flowController: flowController, +<<<<<<< HEAD +======= + redoManager: redoManager, + splitTxn: splitTxn, +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) return sn @@ -293,6 +315,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent + if err := n.verifySplitTxn(event); err != nil { + return false, errors.Trace(err) + } + if event.IsResolved() { if n.status.Load() == TableStatusInitializing { n.status.Store(TableStatusRunning) @@ -346,3 +372,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error { n.flowController.Abort() return n.sink.Close(ctx) } + +// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent +// with `SplitTxn==true`. +func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error { + if n.splitTxn { + return nil + } + + // Fail-fast check, this situation should never happen normally when split transactions + // are not supported. + if e.Resolved != nil && e.Resolved.IsBatchMode() { + msg := fmt.Sprintf("batch mode resolved ts is not supported "+ + "when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + + if e.Row != nil && e.Row.SplitTxn { + msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + return nil +} diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 0c8f5ced58d..fc3b4ea997d 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -126,18 +126,27 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error { func TestStatus(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-status"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) // test stop at targetTs +<<<<<<< HEAD node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) +======= + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). + ChangefeedVars().Info.Config) + require.Equal(t, TableStatePrepared, node.State()) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.Nil(t, node.Receive( pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))) @@ -175,6 +184,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(10), node.CheckpointTs()) // test the stop at ts command +<<<<<<< HEAD node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -182,6 +192,20 @@ func TestStatus(t *testing.T) { require.Nil(t, node.Receive( pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))) require.Equal(t, TableStatusInitializing, node.Status()) +======= + state = TableStatePrepared + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + require.Equal(t, TableStatePrepared, node.State()) + + msg = pmessage.BarrierMessage(20) + ok, err = node.HandleMessage(ctx, msg) + require.True(t, ok) + require.Nil(t, err) + require.Equal(t, TableStatePrepared, node.State()) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.Equal(t, model.Ts(20), node.BarrierTs()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -206,6 +230,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(2), node.CheckpointTs()) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts +<<<<<<< HEAD node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -213,6 +238,20 @@ func TestStatus(t *testing.T) { require.Nil(t, node.Receive( pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))) require.Equal(t, TableStatusInitializing, node.Status()) +======= + state = TableStatePrepared + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + require.Equal(t, TableStatePrepared, node.State()) + + msg = pmessage.BarrierMessage(20) + ok, err = node.HandleMessage(ctx, msg) + require.Nil(t, err) + require.True(t, ok) + require.Equal(t, TableStatePrepared, node.State()) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -240,18 +279,29 @@ func TestStatus(t *testing.T) { // until the underlying sink is closed func TestStopStatus(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-status"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) closeCh := make(chan interface{}, 1) +<<<<<<< HEAD node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) +======= + node := newSinkNode(1, + &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, + &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + require.Equal(t, TableStatePrepared, node.State()) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -279,17 +329,26 @@ func TestStopStatus(t *testing.T) { func TestManyTs(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) sink := &mockSink{} +<<<<<<< HEAD node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) +======= + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + require.Equal(t, TableStatePrepared, node.State()) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -441,16 +500,24 @@ func TestManyTs(t *testing.T) { func TestIgnoreEmptyRowChangeEvent(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) sink := &mockSink{} +<<<<<<< HEAD node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) +======= + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) // empty row, no Columns and PreColumns. msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -463,16 +530,24 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) sink := &mockSink{} +<<<<<<< HEAD node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) +======= + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) // nil row. msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -519,18 +594,25 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) - cfg := config.GetDefaultReplicaConfig() - cfg.EnableOldValue = false + config := config.GetDefaultReplicaConfig() + config.EnableOldValue = false ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: cfg, + Config: config, }, }) sink := &mockSink{} +<<<<<<< HEAD node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) +======= + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) // nil row. msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -674,8 +756,15 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { flowController := &flushFlowController{} sink := &flushSink{} // sNode is a sinkNode +<<<<<<< HEAD sNode := newSinkNode(1, sink, 0, 10, flowController) require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) +======= + sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) sNode.barrierTs = 10 err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8))) @@ -688,3 +777,59 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { require.Equal(t, uint64(8), sNode.checkpointTs) require.Equal(t, 2, flowController.releaseCounter) } + +func TestSplitTxn(t *testing.T) { + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: model.DefaultChangeFeedID("changefeed-id-test"), + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config, + }, + }) + state := TableStatePrepared + flowController := &flushFlowController{} + sink := &flushSink{} + // sNode is a sinkNode + sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + _, err := sNode.HandleMessage(ctx, msg) + require.Nil(t, err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 2, SplitTxn: true}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Regexp(t, ".*should not split txn when sink.splitTxn is.*", err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 2}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Nil(t, err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 7, + Resolved: &model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: 7, + BatchID: 1, + }, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Regexp(t, ".*batch mode resolved ts is not supported.*", err) +} diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 1944199f655..c3c60277aad 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -279,7 +279,14 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) +<<<<<<< HEAD flowController := flowcontrol.NewTableFlowController(t.memoryQuota) +======= + splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn() + + flowController := flowcontrol.NewTableFlowController(t.memoryQuota, + t.redoManager.Enabled(), splitTxn) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, t.mounter, t.replicaConfig, @@ -318,8 +325,13 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, +<<<<<<< HEAD t.targetTs, flowController) actorSinkNode.initWithReplicaConfig(true, t.replicaConfig) +======= + t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID, splitTxn) + actorSinkNode.initWithReplicaConfig(t.replicaConfig) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) t.sinkNode = actorSinkNode // construct sink actor node, it gets message from sortNode or cyclicNode diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 0f80a4e9b6f..fd4bd4d2c53 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -52,6 +52,11 @@ func TestAsyncStopFailed(t *testing.T) { reportErr: func(err error) {}, sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}), } +<<<<<<< HEAD +======= + tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager, + &tbl.state, model.DefaultChangeFeedID("changefeed-test"), false) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.True(t, tbl.AsyncStop(1)) mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) @@ -415,6 +420,7 @@ func TestTableActorStart(t *testing.T) { StartTs: 0, MarkTableID: 1, }, + replicaConfig: config.GetDefaultReplicaConfig(), } require.Nil(t, tbl.start(ctx)) require.Equal(t, 1, len(tbl.nodes)) diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 1e97ed6390e..8b07f49e5f2 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -32,12 +32,21 @@ const ( // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { +<<<<<<< HEAD memoryQuota *tableMemoryQuota +======= + memoryQuota *tableMemoryQuota + lastCommitTs uint64 +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) queueMu struct { sync.Mutex queue deque.Deque } + + redoLogEnabled bool + splitTxn bool + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: // 1. Different txns with same commitTs but different startTs // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size @@ -55,7 +64,20 @@ type txnSizeEntry struct { } // NewTableFlowController creates a new TableFlowController +<<<<<<< HEAD func NewTableFlowController(quota uint64) *TableFlowController { +======= +func NewTableFlowController(quota uint64, redoLogEnabled bool, splitTxn bool) *TableFlowController { + log.Info("create table flow controller", + zap.Uint64("quota", quota), + zap.Bool("redoLogEnabled", redoLogEnabled), + zap.Bool("splitTxn", splitTxn)) + maxSizePerTxn := uint64(defaultSizePerTxn) + if maxSizePerTxn > quota { + maxSizePerTxn = quota + } + +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) return &TableFlowController{ memoryQuota: newTableMemoryQuota(quota), queueMu: struct { @@ -64,6 +86,14 @@ func NewTableFlowController(quota uint64) *TableFlowController { }{ queue: deque.NewDeque(), }, +<<<<<<< HEAD +======= + redoLogEnabled: redoLogEnabled, + splitTxn: splitTxn, + batchSize: defaultBatchSize, + maxRowsPerTxn: defaultRowsPerTxn, + maxSizePerTxn: maxSizePerTxn, +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) } } @@ -76,6 +106,24 @@ func (c *TableFlowController) Consume( ) error { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) +<<<<<<< HEAD +======= + blockingCallBack := func() (err error) { + if commitTs > lastCommitTs || c.splitTxn { + // Call `callback` in two condition: + // 1. commitTs > lastCommitTs, handle new txn and send a normal resolved ts + // 2. commitTs == lastCommitTs && splitTxn = true, split the same txn and + // send a batch resolved ts + err = callBack(c.batchID) + } + + if commitTs == lastCommitTs { + c.batchID++ + c.resetBatch(lastCommitTs, commitTs) + } + return err + } +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) if commitTs < lastCommitTs { log.Panic("commitTs regressed, report a bug", @@ -83,9 +131,17 @@ func (c *TableFlowController) Consume( zap.Uint64("lastCommitTs", c.lastCommitTs)) } +<<<<<<< HEAD if commitTs > lastCommitTs { err := c.memoryQuota.consumeWithBlocking(size, callBack) if err != nil { +======= + if commitTs == lastCommitTs && (c.redoLogEnabled || !c.splitTxn) { + // Here `commitTs == lastCommitTs` means we are not crossing transaction + // boundaries, `c.redoLogEnabled || !c.splitTxn` means batch resolved mode + // are not supported, hence we should use `forceConsume` to avoid deadlock. + if err := c.memoryQuota.forceConsume(size); err != nil { +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) return errors.Trace(err) } } else { @@ -168,8 +224,15 @@ func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size size: size, rowCount: 1, }) +<<<<<<< HEAD c.batchGroupCount++ msg.Row.SplitTxn = true +======= + if c.splitTxn { + msg.Row.SplitTxn = true + } +} +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) if c.batchGroupCount >= batchSize { c.batchGroupCount = 0 diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 6836299e4a4..cf8545ea02d 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -175,7 +175,11 @@ func TestFlowControlBasic(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) +<<<<<<< HEAD flowController := NewTableFlowController(2048) +======= + flowController := NewTableFlowController(2048, true, true) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) errg.Go(func() error { lastCommitTs := uint64(1) @@ -271,7 +275,254 @@ func TestFlowControlBasic(t *testing.T) { if event.size != 0 { atomic.AddUint64(&consumedBytes, -event.size) } else { +<<<<<<< HEAD flowController.Release(event.resolvedTs) +======= + flowController.Release(event.resolved) + } + } + + return nil + }) + + require.Nil(t, errg.Wait()) + require.Equal(t, uint64(0), atomic.LoadUint64(&consumedBytes)) + require.Equal(t, uint64(0), flowController.GetConsumption()) +} + +func TestFlowControlWithBatchAndForceConsume(t *testing.T) { + t.Parallel() + + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Hour*10) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *txnSizeEntry, 1024) + flowController := NewTableFlowController(512, true, true) + maxBatch := uint64(3) + + // simulate a big txn + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := uint64(0); i <= maxBatch*defaultRowsPerTxn*defaultBatchSize; i++ { + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &txnSizeEntry{ + commitTs: lastCommitTs, + size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + maxBatchID := uint64(0) + for { + var mockedRow *txnSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.size) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(batchID uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: batchID, + }, + }: + } + log.Debug("", zap.Any("batchID", batchID)) + maxBatchID = batchID + return nil + }) + require.Nil(t, err) + lastCRTs = mockedRow.commitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.size, + }: + } + } + require.Less(t, uint64(0), flowController.GetConsumption()) + require.LessOrEqual(t, maxBatch, maxBatchID) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: maxBatchID + 1, + }, + }: + } + time.Sleep(time.Millisecond * 500) + require.Equal(t, uint64(0), flowController.GetConsumption()) + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolved) + } + } + + return nil + }) + + require.Nil(t, errg.Wait()) + require.Equal(t, uint64(0), atomic.LoadUint64(&consumedBytes)) +} + +func TestFlowControlWithoutForceConsume(t *testing.T) { + t.Parallel() + + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Hour*10) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *txnSizeEntry, 1024) + flowController := NewTableFlowController(512, false, true) + maxBatch := uint64(3) + + // simulate a big txn + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := uint64(0); i < maxBatch*defaultRowsPerTxn*defaultBatchSize; i++ { + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &txnSizeEntry{ + commitTs: lastCommitTs, + size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + maxBatchID := uint64(0) + for { + var mockedRow *txnSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.size) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(batchID uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: batchID, + }, + }: + } + log.Debug("", zap.Any("batchID", batchID)) + maxBatchID = batchID + return nil + }) + require.Nil(t, err) + lastCRTs = mockedRow.commitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.size, + }: + } + } + require.Less(t, uint64(0), flowController.GetConsumption()) + require.LessOrEqual(t, maxBatch, maxBatchID) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: maxBatchID + 1, + }, + }: + } + time.Sleep(time.Millisecond * 500) + require.Equal(t, uint64(0), flowController.GetConsumption()) + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolved) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) } } @@ -286,7 +537,11 @@ func TestFlowControlAbort(t *testing.T) { t.Parallel() callBacker := &mockCallBacker{} +<<<<<<< HEAD controller := NewTableFlowController(1024) +======= + controller := NewTableFlowController(1024, false, false) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) var wg sync.WaitGroup wg.Add(1) go func() { @@ -317,7 +572,11 @@ func TestFlowControlCallBack(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) +<<<<<<< HEAD flowController := NewTableFlowController(512) +======= + flowController := NewTableFlowController(512, false, false) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) errg.Go(func() error { lastCommitTs := uint64(1) @@ -421,7 +680,11 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { t.Parallel() var wg sync.WaitGroup +<<<<<<< HEAD controller := NewTableFlowController(512) +======= + controller := NewTableFlowController(512, false, false) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -463,7 +726,11 @@ func TestFlowControlCallBackError(t *testing.T) { t.Parallel() var wg sync.WaitGroup +<<<<<<< HEAD controller := NewTableFlowController(512) +======= + controller := NewTableFlowController(512, false, false) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -492,8 +759,13 @@ func TestFlowControlCallBackError(t *testing.T) { func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() +<<<<<<< HEAD controller := NewTableFlowController(1024) err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(bool) error { +======= + controller := NewTableFlowController(1024, false, false) + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(uint64) error { +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) t.Error("unreachable") return nil }) @@ -505,7 +777,11 @@ func BenchmarkTableFlowController(B *testing.B) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 102400) +<<<<<<< HEAD flowController := NewTableFlowController(20 * 1024 * 1024) // 20M +======= + flowController := NewTableFlowController(20*1024*1024, false, false) // 20M +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) errg.Go(func() error { lastCommitTs := uint64(1) diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index fec9b47865e..86651078cb8 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -400,10 +400,6 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - if err := replicaConfig.ApplyProtocol(sinkURI).Validate(); err != nil { - return nil, errors.Trace(err) - } - saramaConfig, err := kafka.NewSaramaConfig(ctx, baseConfig) if err != nil { return nil, errors.Trace(err) @@ -493,10 +489,6 @@ func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { replicaConfig.Sink.Protocol = s } - err := replicaConfig.Validate() - if err != nil { - return nil, err - } var protocol config.Protocol if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index 76211767f89..870fb201b11 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -91,7 +91,12 @@ func TestKafkaSink(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() +<<<<<<< HEAD sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) +======= + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) + sink, err := NewKafkaSaramaSink(ctx, sinkURI, replicaConfig, errCh) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.Nil(t, err) encoder := sink.encoderBuilder.Build() @@ -278,7 +283,12 @@ func TestFlushRowChangedEvents(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() +<<<<<<< HEAD sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) +======= + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) + sink, err := NewKafkaSaramaSink(ctx, sinkURI, replicaConfig, errCh) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.Nil(t, err) // mock kafka broker processes 1 row changed event diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 7477e3fb6dd..22f5bf2dcf0 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -158,6 +158,9 @@ func New( if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } + if err := config.ValidateAndAdjust(sinkURI); err != nil { + return nil, err + } if newSink, ok := sinkIniterMap[strings.ToLower(sinkURI.Scheme)]; ok { return newSink(ctx, changefeedID, sinkURI, filter, config, opts, errCh) } diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index cefaa40b7d3..cf80b2bc01a 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -287,11 +287,15 @@ func (o *createChangefeedOptions) validate(ctx context.Context, cmd *cobra.Comma return errors.New("Creating changefeed without a sink-uri") } - err := o.cfg.Validate() + paredSinkURI, err := url.Parse(o.commonChangefeedOptions.sinkURI) if err != nil { return err } + if err = o.cfg.ValidateAndAdjust(paredSinkURI); err != nil { + return err + } + if err := o.validateStartTs(ctx); err != nil { return err } diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 61ae730d1cb..10e8418bd28 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -160,6 +160,7 @@ func TestStrictDecodeInvalidFile(t *testing.T) { configPath := filepath.Join(tmpDir, "ticdc.toml") configContent := fmt.Sprintf(` +<<<<<<< HEAD unknown = "128.0.0.1:1234" data-dir = "%+v" @@ -169,6 +170,17 @@ max-days = 1 max-backups = 1 `, dataDir) err = os.WriteFile(configPath, []byte(configContent), 0o644) +======= + unknown = "128.0.0.1:1234" + data-dir = "%+v" + + [log.unkown] + max-size = 200 + max-days = 1 + max-backups = 1 + `, dataDir) + err := os.WriteFile(configPath, []byte(configContent), 0o644) +>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)) require.Nil(t, err) conf := config.GetDefaultServerConfig() @@ -189,7 +201,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { require.Equal(t, &config.MounterConfig{ WorkerNum: 16, }, cfg.Mounter) - err = cfg.Validate() + err = cfg.ValidateAndAdjust(nil) require.Nil(t, err) require.Equal(t, &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index e3e6e838e7c..9dc3aed4713 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -168,7 +168,8 @@ const ( ] } ], - "schema-registry": "" + "schema-registry": "", + "transaction-atomicity": "" }, "cyclic-replication": { "enable": false, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2952f216b8a..f15434156c8 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -123,10 +123,10 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } -// Validate verifies that each parameter is valid. -func (c *ReplicaConfig) Validate() error { +// ValidateAndAdjust verifies and adjusts the replica configuration. +func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { if c.Sink != nil { - err := c.Sink.validate(c.EnableOldValue) + err := c.Sink.validateAndAdjust(sinkURI, c.EnableOldValue) if err != nil { return err } @@ -134,15 +134,6 @@ func (c *ReplicaConfig) Validate() error { return nil } -// ApplyProtocol sinkURI to fill the `ReplicaConfig` -func (c *ReplicaConfig) ApplyProtocol(sinkURI *url.URL) *ReplicaConfig { - params := sinkURI.Query() - if s := params.Get(ProtocolKey); s != "" { - c.Sink.Protocol = s - } - return c -} - // GetDefaultReplicaConfig returns the default replica config. func GetDefaultReplicaConfig() *ReplicaConfig { return defaultReplicaConfig.Clone() diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 84f08bb1714..4bd98d7adfe 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -81,27 +81,28 @@ func TestReplicaConfigOutDated(t *testing.T) { {Matcher: []string{"a.c"}, DispatcherRule: "r2"}, {Matcher: []string{"a.d"}, DispatcherRule: "r2"}, } + conf.Sink.TxnAtomicity = unknowTxnAtomicity require.Equal(t, conf, conf2) } func TestReplicaConfigValidate(t *testing.T) { t.Parallel() conf := GetDefaultReplicaConfig() - require.Nil(t, conf.Validate()) + require.Nil(t, conf.ValidateAndAdjust(nil)) // Incorrect sink configuration. conf = GetDefaultReplicaConfig() conf.Sink.Protocol = "canal" conf.EnableOldValue = false require.Regexp(t, ".*canal protocol requires old value to be enabled.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) conf = GetDefaultReplicaConfig() conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, } require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) // Correct sink configuration. conf = GetDefaultReplicaConfig() @@ -110,7 +111,7 @@ func TestReplicaConfigValidate(t *testing.T) { {Matcher: []string{"a.c"}, PartitionRule: "p1"}, {Matcher: []string{"a.d"}}, } - err := conf.Validate() + err := conf.ValidateAndAdjust(nil) require.Nil(t, err) rules := conf.Sink.DispatchRules require.Equal(t, "d1", rules[0].PartitionRule) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b05c9be39e7..30964527e30 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -15,6 +15,7 @@ package config import ( "fmt" + "net/url" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -22,9 +23,36 @@ import ( "go.uber.org/zap" ) -// DefaultMaxMessageBytes sets the default value for max-message-bytes +// DefaultMaxMessageBytes sets the default value for max-message-bytes. const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M +// AtomicityLevel represents the atomicity level of a changefeed. +type AtomicityLevel string + +const ( + // unknowTxnAtomicity is the default atomicity level, which is invalid and will + // be set to a valid value when initializing sink in processor. + unknowTxnAtomicity AtomicityLevel = "" + + // noneTxnAtomicity means atomicity of transactions is not guaranteed + noneTxnAtomicity AtomicityLevel = "none" + + // tableTxnAtomicity means atomicity of single table transactions is guaranteed. + tableTxnAtomicity AtomicityLevel = "table" + + // globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which + // is currently not supported by TiCDC. + // globalTxnAtomicity AtomicityLevel = "global" + + defaultMqTxnAtomicity AtomicityLevel = noneTxnAtomicity + defaultMysqlTxnAtomicity AtomicityLevel = tableTxnAtomicity +) + +// ShouldSplitTxn returns whether the sink should split txn. +func (l AtomicityLevel) ShouldSplitTxn() bool { + return l == noneTxnAtomicity +} + // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var ForceEnableOldValueProtocols = []string{ ProtocolCanal.String(), @@ -38,9 +66,10 @@ type SinkConfig struct { Protocol string `toml:"protocol" json:"protocol"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` SchemaRegistry string `toml:"schema-registry" json:"schema-registry"` + TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"` } -// DispatchRule represents partition rule for a table +// DispatchRule represents partition rule for a table. type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` // Deprecated, please use PartitionRule. @@ -57,7 +86,11 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } -func (s *SinkConfig) validate(enableOldValue bool) error { +func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error { + if err := s.applyParameter(sinkURI); err != nil { + return err + } + if !enableOldValue { for _, protocolStr := range ForceEnableOldValueProtocols { if protocolStr == s.Protocol { @@ -86,3 +119,62 @@ func (s *SinkConfig) validate(enableOldValue bool) error { return nil } + +// applyParameter fill the `ReplicaConfig` and `TxnAtomicity` by sinkURI. +func (s *SinkConfig) applyParameter(sinkURI *url.URL) error { + if sinkURI == nil { + return nil + } + params := sinkURI.Query() + + txnAtomicity := params.Get("transaction-atomicity") + switch AtomicityLevel(txnAtomicity) { + case unknowTxnAtomicity: + // Set default value according to scheme. + if isMqScheme(sinkURI.Scheme) { + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = defaultMysqlTxnAtomicity + } + case noneTxnAtomicity: + s.TxnAtomicity = noneTxnAtomicity + case tableTxnAtomicity: + // MqSink only support `noneTxnAtomicity`. + if isMqScheme(sinkURI.Scheme) { + log.Warn("The configuration of transaction-atomicity is incompatible with scheme", + zap.Any("txnAtomicity", s.TxnAtomicity), + zap.String("scheme", sinkURI.Scheme), + zap.String("protocol", s.Protocol)) + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = tableTxnAtomicity + } + default: + errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", + txnAtomicity, sinkURI.Scheme) + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg) + } + + s.Protocol = params.Get(ProtocolKey) + // validate that protocol is compatible with the scheme + if isMqScheme(sinkURI.Scheme) { + var protocol Protocol + err := protocol.FromString(s.Protocol) + if err != nil { + return err + } + } else if s.Protocol != "" { + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot "+ + "be configured when using %s scheme", sinkURI.Scheme)) + } + + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) + return nil +} + +func isMqScheme(scheme string) bool { + return scheme == "kafka" || scheme == "kafka+ssl" || + scheme == "pulsar" || scheme == "pulsar+ssl" +} diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index c34fcd832bf..94378a25095 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -14,12 +14,13 @@ package config import ( + "net/url" "testing" "github.com/stretchr/testify/require" ) -func TestValidate(t *testing.T) { +func TestValidateOldValue(t *testing.T) { t.Parallel() testCases := []struct { protocol string @@ -73,9 +74,80 @@ func TestValidate(t *testing.T) { Protocol: tc.protocol, } if tc.expectedErr == "" { - require.Nil(t, cfg.validate(tc.enableOldValue)) + require.Nil(t, cfg.validateAndAdjust(nil, tc.enableOldValue)) } else { - require.Regexp(t, tc.expectedErr, cfg.validate(tc.enableOldValue)) + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(nil, tc.enableOldValue)) + } + } +} + +func TestValidateApplyParameter(t *testing.T) { + t.Parallel() + testCases := []struct { + sinkURI string + expectedErr string + expectedLevel AtomicityLevel + }{ + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=global", + expectedErr: "global level atomicity is not supported by.*", + }, + { + sinkURI: "tidb://normal:123456@127.0.0.1:3306?protocol=canal", + expectedErr: ".*protocol cannot be configured when using tidb scheme.*", + }, + { + sinkURI: "blackhole://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?protocol=default", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table", + expectedErr: ".*unknown .* protocol for Message Queue sink.*", + }, + } + + for _, tc := range testCases { + cfg := SinkConfig{} + parsedSinkURI, err := url.Parse(tc.sinkURI) + require.Nil(t, err) + if tc.expectedErr == "" { + require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity) + } else { + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) } } } diff --git a/tests/integration_tests/big_txn/run.sh b/tests/integration_tests/big_txn/run.sh new file mode 100755 index 00000000000..ca969dcb0d7 --- /dev/null +++ b/tests/integration_tests/big_txn/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-big-txn-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE DATABASE big_txn;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn + + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + fi + + check_table_exists "big_txn.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "CREATE TABLE big_txn.USERTABLE1 LIKE big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO big_txn.USERTABLE1 SELECT * FROM big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 60 + check_table_exists "big_txn.USERTABLE1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);" + sleep 120 + check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"