diff --git a/cdc/api/validator_test.go b/cdc/api/validator_test.go index 967e7b11cdb..8c52b6a7410 100644 --- a/cdc/api/validator_test.go +++ b/cdc/api/validator_test.go @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { // test no change error changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"} oldInfo.SinkURI = "blackhole://" + oldInfo.Config.Sink.TxnAtomicity = "table" newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) require.NotNil(t, err) require.Regexp(t, ".*changefeed config is the same with the old one.*", err) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index e6ec1903662..b89e682e6f7 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -15,6 +15,7 @@ package pipeline import ( "context" + "fmt" "sync/atomic" "time" @@ -81,9 +82,17 @@ type sinkNode struct { replicaConfig *config.ReplicaConfig isTableActorMode bool + splitTxn bool } -func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +func newSinkNode( + tableID model.TableID, + sink sink.Sink, + startTs model.Ts, + targetTs model.Ts, + flowController tableFlowController, + splitTxn bool, +) *sinkNode { sn := &sinkNode{ tableID: tableID, sink: sink, @@ -92,6 +101,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target barrierTs: startTs, flowController: flowController, + splitTxn: splitTxn, } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) sn.checkpointTs.Store(model.NewResolvedTs(startTs)) @@ -301,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent + if err := n.verifySplitTxn(event); err != nil { + return false, errors.Trace(err) + } + if event.IsResolved() { if n.status.Load() == TableStatusInitializing { n.status.Store(TableStatusRunning) @@ -360,3 +374,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error { n.flowController.Abort() return n.sink.Close(ctx) } + +// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent +// with `SplitTxn==true`. +func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error { + if n.splitTxn { + return nil + } + + // Fail-fast check, this situation should never happen normally when split transactions + // are not supported. + if e.Resolved != nil && e.Resolved.IsBatchMode() { + msg := fmt.Sprintf("batch mode resolved ts is not supported "+ + "when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + + if e.Row != nil && e.Row.SplitTxn { + msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + return nil +} diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 0497e2b49c7..9304f53e892 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -135,7 +135,7 @@ func TestStatus(t *testing.T) { }) // test stop at targetTs - node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -175,7 +175,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(10), node.CheckpointTs()) // test the stop at ts command - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -206,7 +206,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(2), node.CheckpointTs()) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -249,7 +249,7 @@ func TestStopStatus(t *testing.T) { }) closeCh := make(chan interface{}, 1) - node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) + node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -287,7 +287,7 @@ func TestManyTs(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -449,7 +449,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) // empty row, no Columns and PreColumns. @@ -471,7 +471,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) // nil row. @@ -529,7 +529,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) // nil row. @@ -674,7 +674,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { flowController := &flushFlowController{} sink := &flushSink{} // sNode is a sinkNode - sNode := newSinkNode(1, sink, 0, 10, flowController) + sNode := newSinkNode(1, sink, 0, 10, flowController, false) require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) sNode.barrierTs = 10 diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 3de4d307256..5e104f5a053 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -204,7 +204,10 @@ func NewTablePipeline(ctx cdcContext.Context, zap.String("tableName", tableName), zap.Int64("tableID", tableID), zap.Uint64("quota", perTableMemoryQuota)) - flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota, redoLogEnabled) + splitTxn := replConfig.Sink.TxnAtomicity.ShouldSplitTxn() + + flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota, + redoLogEnabled, splitTxn) config := ctx.ChangefeedVars().Info.Config cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled() runnerSize := defaultRunnersSize @@ -215,7 +218,7 @@ func NewTablePipeline(ctx cdcContext.Context, p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter, replConfig) - sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController) + sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController, splitTxn) p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName, changefeed, upstream)) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index e633329d075..dcf58694556 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -57,8 +57,8 @@ type tableActor struct { // backend mounter mounter entry.Mounter // backend tableSink - tableSink sink.Sink - redoLogEnabled bool + tableSink sink.Sink + redoManager redo.LogManager pullerNode *pullerNode sortNode *sorterNode @@ -104,7 +104,7 @@ func NewTableActor(cdcCtx cdcContext.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, - redoLogEnabled bool, + redoManager redo.LogManager, targetTs model.Ts, ) (TablePipeline, error) { config := cdcCtx.ChangefeedVars().Info.Config @@ -126,19 +126,19 @@ func NewTableActor(cdcCtx cdcContext.Context, wg: wg, cancel: cancel, - tableID: tableID, - markTableID: replicaInfo.MarkTableID, - tableName: tableName, - cyclicEnabled: cyclicEnabled, - memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, - upStream: upStream, - mounter: mounter, - replicaInfo: replicaInfo, - replicaConfig: config, - tableSink: sink, - redoLogEnabled: redoLogEnabled, - targetTs: targetTs, - started: false, + tableID: tableID, + markTableID: replicaInfo.MarkTableID, + tableName: tableName, + cyclicEnabled: cyclicEnabled, + memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, + upStream: upStream, + mounter: mounter, + replicaInfo: replicaInfo, + replicaConfig: config, + tableSink: sink, + redoManager: redoManager, + targetTs: targetTs, + started: false, changefeedID: changefeedVars.ID, changefeedVars: changefeedVars, @@ -282,7 +282,10 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) - flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled) + splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn() + + flowController := flowcontrol.NewTableFlowController(t.memoryQuota, + t.redoManager.Enabled(), splitTxn) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, t.mounter, t.replicaConfig, @@ -321,7 +324,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, - t.targetTs, flowController) + t.targetTs, flowController, splitTxn) actorSinkNode.initWithReplicaConfig(true, t.replicaConfig) t.sinkNode = actorSinkNode diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index d5bc42366f0..3981798bf92 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -49,7 +49,7 @@ func TestAsyncStopFailed(t *testing.T) { router: tableActorRouter, cancel: func() {}, reportErr: func(err error) {}, - sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}), + sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false), } require.True(t, tbl.AsyncStop(1)) @@ -358,7 +358,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, false, 10) + }, &mockSink{}, redo.NewDisabledManager(), 10) require.NotNil(t, tbl) require.Nil(t, err) require.NotPanics(t, func() { @@ -374,7 +374,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, false, 10) + }, &mockSink{}, redo.NewDisabledManager(), 10) require.Nil(t, tbl) require.NotNil(t, err) @@ -414,6 +414,8 @@ func TestTableActorStart(t *testing.T) { StartTs: 0, MarkTableID: 1, }, + redoManager: redo.NewDisabledManager(), + replicaConfig: config.GetDefaultReplicaConfig(), } require.Nil(t, tbl.start(ctx)) require.Equal(t, 1, len(tbl.nodes)) @@ -427,10 +429,12 @@ func TestTableActorStart(t *testing.T) { Config: config.GetDefaultReplicaConfig(), }, }, + redoManager: redo.NewDisabledManager(), replicaInfo: &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, }, + replicaConfig: config.GetDefaultReplicaConfig(), } tbl.cyclicEnabled = true require.Nil(t, tbl.start(ctx)) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index a13dcd88bb6..108a9c76f9b 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -813,7 +813,7 @@ func (p *processor) createTablePipelineImpl( tableName, replicaInfo, s, - p.redoManager.Enabled(), + p.redoManager, p.changefeed.Info.GetTargetTs()) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 597c2b17cc0..0f1e0ce5050 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -32,14 +32,17 @@ const ( // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { - memoryQuota *tableMemoryQuota - redoLogEnabled bool - lastCommitTs uint64 + memoryQuota *tableMemoryQuota + lastCommitTs uint64 queueMu struct { sync.Mutex queue deque.Deque } + + redoLogEnabled bool + splitTxn bool + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: // 1. Different txns with same commitTs but different startTs // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size @@ -62,24 +65,29 @@ type txnSizeEntry struct { } // NewTableFlowController creates a new TableFlowController -func NewTableFlowController(quota uint64, redoLogEnabled bool) *TableFlowController { +func NewTableFlowController(quota uint64, redoLogEnabled bool, splitTxn bool) *TableFlowController { + log.Info("create table flow controller", + zap.Uint64("quota", quota), + zap.Bool("redoLogEnabled", redoLogEnabled), + zap.Bool("splitTxn", splitTxn)) maxSizePerTxn := uint64(defaultSizePerTxn) - if maxSizePerTxn > quota && !redoLogEnabled { + if maxSizePerTxn > quota { maxSizePerTxn = quota } return &TableFlowController{ - memoryQuota: newTableMemoryQuota(quota), - redoLogEnabled: redoLogEnabled, + memoryQuota: newTableMemoryQuota(quota), queueMu: struct { sync.Mutex queue deque.Deque }{ queue: deque.NewDeque(), }, - batchSize: defaultBatchSize, - maxRowsPerTxn: defaultRowsPerTxn, - maxSizePerTxn: maxSizePerTxn, + redoLogEnabled: redoLogEnabled, + splitTxn: splitTxn, + batchSize: defaultBatchSize, + maxRowsPerTxn: defaultRowsPerTxn, + maxSizePerTxn: maxSizePerTxn, } } @@ -92,8 +100,14 @@ func (c *TableFlowController) Consume( ) error { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) - blockingCallBack := func() error { - err := callBack(c.batchID) + blockingCallBack := func() (err error) { + if commitTs > lastCommitTs || c.splitTxn { + // Call `callback` in two condition: + // 1. commitTs > lastCommitTs, handle new txn and send a normal resolved ts + // 2. commitTs == lastCommitTs && splitTxn = true, split the same txn and + // send a batch resolved ts + err = callBack(c.batchID) + } if commitTs == lastCommitTs { c.batchID++ @@ -108,11 +122,10 @@ func (c *TableFlowController) Consume( zap.Uint64("lastCommitTs", c.lastCommitTs)) } - if commitTs == lastCommitTs && c.redoLogEnabled { - // Here commitTs == lastCommitTs, which means we are not crossing transaction - // boundaries, and redo log currently does not support split transactions, hence - // we use `forceConsume` to avoid deadlock. - // TODO: fix this after we figure out how to make redo log support split txn. + if commitTs == lastCommitTs && (c.redoLogEnabled || !c.splitTxn) { + // Here `commitTs == lastCommitTs` means we are not crossing transaction + // boundaries, `c.redoLogEnabled || !c.splitTxn` means batch resolved mode + // are not supported, hence we should use `forceConsume` to avoid deadlock. if err := c.memoryQuota.forceConsume(size); err != nil { return errors.Trace(err) } @@ -198,7 +211,9 @@ func (c *TableFlowController) addEntry(msg *model.PolymorphicEvent, size uint64) rowCount: 1, batchID: c.batchID, }) - msg.Row.SplitTxn = true + if c.splitTxn { + msg.Row.SplitTxn = true + } } // resetBatch reset batchID and batchGroupCount if handling a new txn, Otherwise, diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 7c2cefac86a..aedd9d92445 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -181,7 +181,7 @@ func TestFlowControlWithForceConsume(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(2048, true) + flowController := NewTableFlowController(2048, true, true) errg.Go(func() error { lastCommitTs := uint64(1) @@ -297,7 +297,7 @@ func TestFlowControlWithBatchAndForceConsume(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512, true) + flowController := NewTableFlowController(512, true, true) maxBatch := uint64(3) // simulate a big txn @@ -366,7 +366,7 @@ func TestFlowControlWithBatchAndForceConsume(t *testing.T) { } } require.Less(t, uint64(0), flowController.GetConsumption()) - require.Equal(t, maxBatch, maxBatchID) + require.LessOrEqual(t, maxBatch, maxBatchID) select { case <-ctx.Done(): return ctx.Err() @@ -418,7 +418,7 @@ func TestFlowControlWithoutForceConsume(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512, false) + flowController := NewTableFlowController(512, false, true) maxBatch := uint64(3) // simulate a big txn @@ -535,7 +535,7 @@ func TestFlowControlAbort(t *testing.T) { t.Parallel() callBacker := &mockCallBacker{} - controller := NewTableFlowController(1024, false) + controller := NewTableFlowController(1024, false, false) var wg sync.WaitGroup wg.Add(1) go func() { @@ -566,7 +566,7 @@ func TestFlowControlCallBack(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512, false) + flowController := NewTableFlowController(512, false, false) errg.Go(func() error { lastCommitTs := uint64(1) @@ -670,7 +670,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512, false) + controller := NewTableFlowController(512, false, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -712,7 +712,7 @@ func TestFlowControlCallBackError(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512, false) + controller := NewTableFlowController(512, false, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -741,7 +741,7 @@ func TestFlowControlCallBackError(t *testing.T) { func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() - controller := NewTableFlowController(1024, false) + controller := NewTableFlowController(1024, false, false) err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(uint64) error { t.Error("unreachable") return nil @@ -754,7 +754,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 102400) - flowController := NewTableFlowController(20*1024*1024, false) // 20M + flowController := NewTableFlowController(20*1024*1024, false, false) // 20M errg.Go(func() error { lastCommitTs := uint64(1) diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index f63bd1c9189..ade36762bb1 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -404,10 +404,6 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - if err := replicaConfig.ApplyProtocol(sinkURI).Validate(); err != nil { - return nil, errors.Trace(err) - } - saramaConfig, err := kafka.NewSaramaConfig(ctx, baseConfig) if err != nil { return nil, errors.Trace(err) @@ -497,10 +493,6 @@ func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { replicaConfig.Sink.Protocol = s } - err := replicaConfig.Validate() - if err != nil { - return nil, err - } var protocol config.Protocol if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index d1d757665c0..6e2cc8c85d9 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -91,6 +91,7 @@ func TestKafkaSink(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) require.Nil(t, err) @@ -187,6 +188,7 @@ func TestKafkaSinkFilter(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) require.Nil(t, err) @@ -278,6 +280,7 @@ func TestFlushRowChangedEvents(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) require.Nil(t, err) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 4838e351cd8..3bc353fc2b0 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -158,6 +158,9 @@ func New( if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } + if err := config.ValidateAndAdjust(sinkURI); err != nil { + return nil, err + } if newSink, ok := sinkIniterMap[strings.ToLower(sinkURI.Scheme)]; ok { return newSink(ctx, changefeedID, sinkURI, filter, config, opts, errCh) } diff --git a/dm/tests/_utils/wait_process_exit b/dm/tests/_utils/wait_process_exit index 967c1c78466..dd92d0905df 100755 --- a/dm/tests/_utils/wait_process_exit +++ b/dm/tests/_utils/wait_process_exit @@ -16,9 +16,5 @@ while [ $WAIT_COUNT -lt 120 ]; do ((WAIT_COUNT++)) done -<<<<<<< HEAD echo "process $process didn't exit after 120 seconds, current processlist: $(pgrep $process)" -======= -echo "process $process didn't exit after 120 seconds, current processlist: $(ps aux | grep $process | grep -v 'grep')" ->>>>>>> 65e67fc4b (test(dm): fix unstable tests (#5865)) exit 1 diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index cefaa40b7d3..cf80b2bc01a 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -287,11 +287,15 @@ func (o *createChangefeedOptions) validate(ctx context.Context, cmd *cobra.Comma return errors.New("Creating changefeed without a sink-uri") } - err := o.cfg.Validate() + paredSinkURI, err := url.Parse(o.commonChangefeedOptions.sinkURI) if err != nil { return err } + if err = o.cfg.ValidateAndAdjust(paredSinkURI); err != nil { + return err + } + if err := o.validateStartTs(ctx); err != nil { return err } diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 61ae730d1cb..e4df5c725d8 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -189,7 +189,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { require.Equal(t, &config.MounterConfig{ WorkerNum: 16, }, cfg.Mounter) - err = cfg.Validate() + err = cfg.ValidateAndAdjust(nil) require.Nil(t, err) require.Equal(t, &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index e3e6e838e7c..9dc3aed4713 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -168,7 +168,8 @@ const ( ] } ], - "schema-registry": "" + "schema-registry": "", + "transaction-atomicity": "" }, "cyclic-replication": { "enable": false, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2952f216b8a..f15434156c8 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -123,10 +123,10 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } -// Validate verifies that each parameter is valid. -func (c *ReplicaConfig) Validate() error { +// ValidateAndAdjust verifies and adjusts the replica configuration. +func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { if c.Sink != nil { - err := c.Sink.validate(c.EnableOldValue) + err := c.Sink.validateAndAdjust(sinkURI, c.EnableOldValue) if err != nil { return err } @@ -134,15 +134,6 @@ func (c *ReplicaConfig) Validate() error { return nil } -// ApplyProtocol sinkURI to fill the `ReplicaConfig` -func (c *ReplicaConfig) ApplyProtocol(sinkURI *url.URL) *ReplicaConfig { - params := sinkURI.Query() - if s := params.Get(ProtocolKey); s != "" { - c.Sink.Protocol = s - } - return c -} - // GetDefaultReplicaConfig returns the default replica config. func GetDefaultReplicaConfig() *ReplicaConfig { return defaultReplicaConfig.Clone() diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 84f08bb1714..4bd98d7adfe 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -81,27 +81,28 @@ func TestReplicaConfigOutDated(t *testing.T) { {Matcher: []string{"a.c"}, DispatcherRule: "r2"}, {Matcher: []string{"a.d"}, DispatcherRule: "r2"}, } + conf.Sink.TxnAtomicity = unknowTxnAtomicity require.Equal(t, conf, conf2) } func TestReplicaConfigValidate(t *testing.T) { t.Parallel() conf := GetDefaultReplicaConfig() - require.Nil(t, conf.Validate()) + require.Nil(t, conf.ValidateAndAdjust(nil)) // Incorrect sink configuration. conf = GetDefaultReplicaConfig() conf.Sink.Protocol = "canal" conf.EnableOldValue = false require.Regexp(t, ".*canal protocol requires old value to be enabled.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) conf = GetDefaultReplicaConfig() conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, } require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) // Correct sink configuration. conf = GetDefaultReplicaConfig() @@ -110,7 +111,7 @@ func TestReplicaConfigValidate(t *testing.T) { {Matcher: []string{"a.c"}, PartitionRule: "p1"}, {Matcher: []string{"a.d"}}, } - err := conf.Validate() + err := conf.ValidateAndAdjust(nil) require.Nil(t, err) rules := conf.Sink.DispatchRules require.Equal(t, "d1", rules[0].PartitionRule) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b05c9be39e7..30964527e30 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -15,6 +15,7 @@ package config import ( "fmt" + "net/url" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -22,9 +23,36 @@ import ( "go.uber.org/zap" ) -// DefaultMaxMessageBytes sets the default value for max-message-bytes +// DefaultMaxMessageBytes sets the default value for max-message-bytes. const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M +// AtomicityLevel represents the atomicity level of a changefeed. +type AtomicityLevel string + +const ( + // unknowTxnAtomicity is the default atomicity level, which is invalid and will + // be set to a valid value when initializing sink in processor. + unknowTxnAtomicity AtomicityLevel = "" + + // noneTxnAtomicity means atomicity of transactions is not guaranteed + noneTxnAtomicity AtomicityLevel = "none" + + // tableTxnAtomicity means atomicity of single table transactions is guaranteed. + tableTxnAtomicity AtomicityLevel = "table" + + // globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which + // is currently not supported by TiCDC. + // globalTxnAtomicity AtomicityLevel = "global" + + defaultMqTxnAtomicity AtomicityLevel = noneTxnAtomicity + defaultMysqlTxnAtomicity AtomicityLevel = tableTxnAtomicity +) + +// ShouldSplitTxn returns whether the sink should split txn. +func (l AtomicityLevel) ShouldSplitTxn() bool { + return l == noneTxnAtomicity +} + // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var ForceEnableOldValueProtocols = []string{ ProtocolCanal.String(), @@ -38,9 +66,10 @@ type SinkConfig struct { Protocol string `toml:"protocol" json:"protocol"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` SchemaRegistry string `toml:"schema-registry" json:"schema-registry"` + TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"` } -// DispatchRule represents partition rule for a table +// DispatchRule represents partition rule for a table. type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` // Deprecated, please use PartitionRule. @@ -57,7 +86,11 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } -func (s *SinkConfig) validate(enableOldValue bool) error { +func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error { + if err := s.applyParameter(sinkURI); err != nil { + return err + } + if !enableOldValue { for _, protocolStr := range ForceEnableOldValueProtocols { if protocolStr == s.Protocol { @@ -86,3 +119,62 @@ func (s *SinkConfig) validate(enableOldValue bool) error { return nil } + +// applyParameter fill the `ReplicaConfig` and `TxnAtomicity` by sinkURI. +func (s *SinkConfig) applyParameter(sinkURI *url.URL) error { + if sinkURI == nil { + return nil + } + params := sinkURI.Query() + + txnAtomicity := params.Get("transaction-atomicity") + switch AtomicityLevel(txnAtomicity) { + case unknowTxnAtomicity: + // Set default value according to scheme. + if isMqScheme(sinkURI.Scheme) { + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = defaultMysqlTxnAtomicity + } + case noneTxnAtomicity: + s.TxnAtomicity = noneTxnAtomicity + case tableTxnAtomicity: + // MqSink only support `noneTxnAtomicity`. + if isMqScheme(sinkURI.Scheme) { + log.Warn("The configuration of transaction-atomicity is incompatible with scheme", + zap.Any("txnAtomicity", s.TxnAtomicity), + zap.String("scheme", sinkURI.Scheme), + zap.String("protocol", s.Protocol)) + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = tableTxnAtomicity + } + default: + errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", + txnAtomicity, sinkURI.Scheme) + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg) + } + + s.Protocol = params.Get(ProtocolKey) + // validate that protocol is compatible with the scheme + if isMqScheme(sinkURI.Scheme) { + var protocol Protocol + err := protocol.FromString(s.Protocol) + if err != nil { + return err + } + } else if s.Protocol != "" { + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot "+ + "be configured when using %s scheme", sinkURI.Scheme)) + } + + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) + return nil +} + +func isMqScheme(scheme string) bool { + return scheme == "kafka" || scheme == "kafka+ssl" || + scheme == "pulsar" || scheme == "pulsar+ssl" +} diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index c34fcd832bf..94378a25095 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -14,12 +14,13 @@ package config import ( + "net/url" "testing" "github.com/stretchr/testify/require" ) -func TestValidate(t *testing.T) { +func TestValidateOldValue(t *testing.T) { t.Parallel() testCases := []struct { protocol string @@ -73,9 +74,80 @@ func TestValidate(t *testing.T) { Protocol: tc.protocol, } if tc.expectedErr == "" { - require.Nil(t, cfg.validate(tc.enableOldValue)) + require.Nil(t, cfg.validateAndAdjust(nil, tc.enableOldValue)) } else { - require.Regexp(t, tc.expectedErr, cfg.validate(tc.enableOldValue)) + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(nil, tc.enableOldValue)) + } + } +} + +func TestValidateApplyParameter(t *testing.T) { + t.Parallel() + testCases := []struct { + sinkURI string + expectedErr string + expectedLevel AtomicityLevel + }{ + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=global", + expectedErr: "global level atomicity is not supported by.*", + }, + { + sinkURI: "tidb://normal:123456@127.0.0.1:3306?protocol=canal", + expectedErr: ".*protocol cannot be configured when using tidb scheme.*", + }, + { + sinkURI: "blackhole://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?protocol=default", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table", + expectedErr: ".*unknown .* protocol for Message Queue sink.*", + }, + } + + for _, tc := range testCases { + cfg := SinkConfig{} + parsedSinkURI, err := url.Parse(tc.sinkURI) + require.Nil(t, err) + if tc.expectedErr == "" { + require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity) + } else { + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) } } } diff --git a/tests/integration_tests/big_txn/conf/diff_config.toml b/tests/integration_tests/big_txn/conf/diff_config.toml new file mode 100644 index 00000000000..367c21817c8 --- /dev/null +++ b/tests/integration_tests/big_txn/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/big_txn/sync_diff/output" + + source-instances = ["tidb"] + + target-instance = "mysql" + + target-check-tables = ["big_txn.*"] + +[data-sources] +[data-sources.tidb] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.mysql] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/big_txn/conf/workload b/tests/integration_tests/big_txn/conf/workload new file mode 100644 index 00000000000..d15d4a81bfd --- /dev/null +++ b/tests/integration_tests/big_txn/conf/workload @@ -0,0 +1,14 @@ +threadcount=1 +recordcount=5000 +operationcount=0 +workload=core +fieldcount=100 + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/big_txn/run.sh b/tests/integration_tests/big_txn/run.sh new file mode 100755 index 00000000000..ca969dcb0d7 --- /dev/null +++ b/tests/integration_tests/big_txn/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-big-txn-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE DATABASE big_txn;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn + + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + fi + + check_table_exists "big_txn.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "CREATE TABLE big_txn.USERTABLE1 LIKE big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO big_txn.USERTABLE1 SELECT * FROM big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 60 + check_table_exists "big_txn.USERTABLE1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);" + sleep 120 + check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"