From c092599a5ff76fe319e4067bb306b98a237c3bb0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 12 Jun 2024 00:26:59 +0800 Subject: [PATCH] pkg/config, sink(ticdc): support output raw change event for mq and cloud storage sink (#11226) (#11290) close pingcap/tiflow#11211 --- cdc/api/v2/model.go | 50 +++-- cdc/model/sink.go | 44 ++-- cdc/model/sink_test.go | 26 ++- .../sinkmanager/table_sink_wrapper_test.go | 4 + .../blackhole/black_hole_dml_sink.go | 6 +- .../cloudstorage/cloud_storage_dml_sink.go | 26 +-- cdc/sinkv2/eventsink/event.go | 5 +- cdc/sinkv2/eventsink/event_sink.go | 4 +- cdc/sinkv2/eventsink/mq/kafka_dml_sink.go | 2 +- cdc/sinkv2/eventsink/mq/mq_dml_sink.go | 25 ++- cdc/sinkv2/eventsink/txn/txn_sink.go | 6 +- cdc/sinkv2/tablesink/table_sink_impl.go | 2 +- cdc/sinkv2/tablesink/table_sink_impl_test.go | 8 +- docs/swagger/docs.go | 192 ++++++++++++++++++ docs/swagger/swagger.json | 192 ++++++++++++++++++ docs/swagger/swagger.yaml | 133 ++++++++++++ errors.toml | 5 + pkg/config/sink.go | 64 +++++- pkg/errors/cdc_errors.go | 4 + .../conf/changefeed1.toml | 25 +++ .../conf/changefeed2.toml | 24 +++ .../conf/diff_config.toml | 29 +++ .../data/prepare.sql | 27 +++ .../data/run.sql | 40 ++++ .../result/changefeed1_pk.res | 22 ++ .../result/changefeed1_uk.res | 24 +++ .../result/changefeed2_pk.res | 26 +++ .../result/changefeed2_uk.res | 26 +++ .../csv_storage_update_pk_clustered/run.sh | 52 +++++ .../conf/changefeed1.toml | 25 +++ .../conf/changefeed2.toml | 24 +++ .../conf/diff_config.toml | 29 +++ .../data/prepare.sql | 28 +++ .../data/run.sql | 40 ++++ .../result/changefeed1_pk.res | 24 +++ .../result/changefeed1_uk.res | 24 +++ .../result/changefeed2_pk.res | 28 +++ .../result/changefeed2_uk.res | 26 +++ .../csv_storage_update_pk_nonclustered/run.sh | 61 ++++++ tests/integration_tests/run_group.sh | 2 +- 40 files changed, 1304 insertions(+), 100 deletions(-) create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_clustered/run.sh create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res create mode 100644 tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 2e35b8deac3..e9104a3298d 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -354,18 +354,20 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( SASLOAuthGrantType: c.Sink.KafkaConfig.SASLOAuthGrantType, SASLOAuthAudience: c.Sink.KafkaConfig.SASLOAuthAudience, LargeMessageHandle: largeMessageHandle, + OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent, } } if c.Sink.CloudStorageConfig != nil { res.Sink.CloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent, } } } @@ -502,18 +504,20 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType, SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience, LargeMessageHandle: largeMessageHandle, + OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent, } } if cloned.Sink.CloudStorageConfig != nil { res.Sink.CloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent, } } } @@ -679,18 +683,20 @@ type KafkaConfig struct { SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` - LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - FlushConcurrency *int `json:"flush_concurrency,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` - FileExpirationDays *int `json:"file_expiration_days,omitempty"` - FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // CSVConfig denotes the csv config diff --git a/cdc/model/sink.go b/cdc/model/sink.go index f479e68a565..67ca84578a7 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -279,7 +279,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent(sinkScheme string) error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -377,7 +377,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(sinkScheme string) error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -794,11 +794,19 @@ func (t *SingleTableTxn) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { - if !t.shouldSplitUpdateEvent(sinkScheme) { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error { + if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent { + // For MySQL Sink, all update events will be split into insert and delete at the puller side + // according to whether the changefeed is in safemode. We don't split update event here(in sink) + // since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062. + // + // For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control + // split behavior. TiCDC only output original change event if outputRawChangeEvent is true. return nil } + // Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false. + // Note it is only for backward compatibility, and we should remove this logic in the future. newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) @@ -807,21 +815,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error { return nil } -// Whether split a single update event into delete and insert events? -// -// For the MySQL Sink, we don't split any update event. -// This may cause error like "duplicate entry" when sink to the downstream. -// This kind of error will cause the changefeed to restart, -// and then the related update rows will be splitted to insert and delete at puller side. -// -// For the Kafka and Storage sink, always split a single unique key changed update event, since: -// 1. Avro and CSV does not output the previous column values for the update event, so it would -// cause consumer missing data if the unique key changed event is not split. -// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. -func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - return !sink.IsMySQLCompatibleScheme(sinkScheme) -} - // trySplitAndSortUpdateEvent try to split update events if unique key is updated // returns true if some updated events is split func trySplitAndSortUpdateEvent( @@ -831,8 +824,7 @@ func trySplitAndSortUpdateEvent( split := false for _, e := range events { if e == nil { - log.Warn("skip emit nil event", - zap.Any("event", e)) + log.Warn("skip emit nil event", zap.Any("event", e)) continue } @@ -842,8 +834,7 @@ func trySplitAndSortUpdateEvent( // begin; insert into t (id) values (1); delete from t where id=1; commit; // Just ignore these row changed events. if colLen == 0 && preColLen == 0 { - log.Warn("skip emit empty row event", - zap.Any("event", e)) + log.Warn("skip emit empty row event", zap.Any("event", e)) continue } @@ -869,7 +860,7 @@ func trySplitAndSortUpdateEvent( // ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on // whether the handle key column or unique key has been modified. -// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. +// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { @@ -912,6 +903,11 @@ func SplitUpdateEvent( // NOTICE: clean up pre cols for insert event. insertEvent.PreColumns = nil + log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs), + zap.Uint64("commitTs", updateEvent.CommitTs), + zap.Any("preCols", updateEvent.PreColumns), + zap.Any("cols", updateEvent.Columns)) + return &deleteEvent, &insertEvent, nil } diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index 4041c673958..690f3ae5497 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -534,6 +534,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { events := []*RowChangedEvent{ { + TableInfo: &TableInfo{}, CommitTs: 1, Columns: columns, PreColumns: preColumns, @@ -573,6 +574,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { events = []*RowChangedEvent{ { + TableInfo: &TableInfo{}, CommitTs: 1, Columns: columns, PreColumns: preColumns, @@ -613,6 +615,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { events = []*RowChangedEvent{ { + TableInfo: &TableInfo{}, CommitTs: 1, Columns: columns, PreColumns: preColumns, @@ -624,6 +627,12 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) { } var ukUpdatedEvent = &RowChangedEvent{ + TableInfo: &TableInfo{ + TableName: TableName{ + Schema: "test", + Table: "t1", + }, + }, PreColumns: []*Column{ { Name: "col1", @@ -656,21 +665,32 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) { Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme) + outputRawChangeEvent := true + notOutputRawChangeEvent := false + err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 2) txn = &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 1) txn2 := &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn2.Rows, 2) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 9bee0b0ffba..84f98b0a115 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -51,6 +51,10 @@ func (m *mockSink) WriteEvents(events ...*eventsink.CallbackableEvent[*model.Row return nil } +func (m *mockSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false +} + func (m *mockSink) GetEvents() []*eventsink.CallbackableEvent[*model.RowChangedEvent] { m.mu.Lock() defer m.mu.Unlock() diff --git a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go index d324ceb5c86..265d327faf3 100644 --- a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go @@ -47,9 +47,9 @@ func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChange return } -// Scheme returns the sink scheme. -func (s *Sink) Scheme() string { - return sink.BlackHoleScheme +// SchemeOption returns the scheme and the option. +func (s *Sink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, true } // Close do nothing. diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go index 3024d8bd700..e33c152927e 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go @@ -65,8 +65,9 @@ type eventFragment struct { // dmlSink is the cloud storage sink. // It will send the events to cloud storage systems. type dmlSink struct { - changefeedID model.ChangeFeedID - scheme string + changefeedID model.ChangeFeedID + scheme string + outputRawChangeEvent bool // last sequence number lastSeqNum uint64 // encodingWorkers defines a group of workers for encoding events. @@ -133,13 +134,14 @@ func NewCloudStorageSink( wgCtx, wgCancel := context.WithCancel(ctx) s := &dmlSink{ - changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), - scheme: strings.ToLower(sinkURI.Scheme), - encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), - workers: make([]*dmlWorker, cfg.WorkerCount), - statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), - cancel: wgCancel, - dead: make(chan struct{}), + changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), + scheme: strings.ToLower(sinkURI.Scheme), + outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(), + encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), + workers: make([]*dmlWorker, cfg.WorkerCount), + statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), + cancel: wgCancel, + dead: make(chan struct{}), } s.alive.msgCh = chann.NewDrainableChann[eventFragment]() @@ -244,9 +246,9 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single return nil } -// Scheme returns the sink scheme. -func (s *dmlSink) Scheme() string { - return s.scheme +// SchemeOption returns the scheme and the option. +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent } // Close closes the cloud storage sink. diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go index 619fcb87d76..a95a3984eb4 100644 --- a/cdc/sinkv2/eventsink/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -22,9 +22,8 @@ import ( type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 - // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated. - // Note that sinkScheme is used to control the split behavior. - TrySplitAndSortUpdateEvent(sinkScheme string) error + // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated + TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sinkv2/eventsink/event_sink.go b/cdc/sinkv2/eventsink/event_sink.go index c9aa11862b9..1f1797dc790 100644 --- a/cdc/sinkv2/eventsink/event_sink.go +++ b/cdc/sinkv2/eventsink/event_sink.go @@ -18,8 +18,8 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error - // Scheme returns the sink scheme. - Scheme() string + // SchemeOption returns the sink scheme and whether the sink should output raw change event. + SchemeOption() (scheme string, outputRawChangeEvent bool) // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. diff --git a/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go b/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go index 788d39f1039..73bb09f8e89 100644 --- a/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/kafka_dml_sink.go @@ -118,7 +118,7 @@ func NewKafkaDMLSink( } s, err := newSink(ctx, sinkURI, p, topicManager, eventRouter, encoderConfig, - replicaConfig.Sink.EncoderConcurrency, errCh) + replicaConfig.Sink.EncoderConcurrency, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go index 85f140d157e..7481ebd9fe9 100644 --- a/cdc/sinkv2/eventsink/mq/mq_dml_sink.go +++ b/cdc/sinkv2/eventsink/mq/mq_dml_sink.go @@ -47,8 +47,7 @@ var _ eventsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil) // It will send the events to the MQ system. type dmlSink struct { // id indicates this sink belongs to which processor(changefeed). - id model.ChangeFeedID - scheme string + id model.ChangeFeedID // protocol indicates the protocol used by this sink. protocol config.Protocol @@ -69,6 +68,9 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} + + scheme string + outputRawChangeEvent bool } func newSink(ctx context.Context, @@ -78,6 +80,7 @@ func newSink(ctx context.Context, eventRouter *dispatcher.EventRouter, encoderConfig *common.Config, encoderConcurrency int, + outputRawChangeEvent bool, errCh chan error, ) (*dmlSink, error) { changefeedID := contextutil.ChangefeedIDFromCtx(ctx) @@ -94,12 +97,13 @@ func newSink(ctx context.Context, encoderBuilder, encoderConcurrency, producer, statistics) s := &dmlSink{ - id: changefeedID, - scheme: strings.ToLower(sinkURI.Scheme), - protocol: encoderConfig.Protocol, - ctx: ctx, - cancel: cancel, - dead: make(chan struct{}), + id: changefeedID, + protocol: encoderConfig.Protocol, + ctx: ctx, + cancel: cancel, + dead: make(chan struct{}), + scheme: strings.ToLower(sinkURI.Scheme), + outputRawChangeEvent: outputRawChangeEvent, } s.alive.eventRouter = eventRouter s.alive.topicManager = topicManager @@ -199,8 +203,9 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single return nil } -func (s *dmlSink) Scheme() string { - return s.scheme +// SchemeOption returns the scheme of this sink. +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent } // Close closes the sink. diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go index 8a440acd801..ebf39081e54 100644 --- a/cdc/sinkv2/eventsink/txn/txn_sink.go +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -159,9 +159,9 @@ func (s *sink) WriteEvents(txnEvents ...*eventsink.TxnCallbackableEvent) error { return nil } -// Scheme returns the sink scheme. -func (s *sink) Scheme() string { - return s.scheme +// SchemeOption returns the sink scheme. +func (s *sink) SchemeOption() (string, bool) { + return s.scheme, false } // Close closes the sink. It won't wait for all pending items backend handled. diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index fb43dbabfc3..2ef6f0751a8 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -138,7 +138,7 @@ func (e *EventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) error resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.SchemeOption()); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. diff --git a/cdc/sinkv2/tablesink/table_sink_impl_test.go b/cdc/sinkv2/tablesink/table_sink_impl_test.go index 4993baea0c7..6b918137930 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl_test.go +++ b/cdc/sinkv2/tablesink/table_sink_impl_test.go @@ -41,10 +41,6 @@ func (m *mockEventSink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) err return nil } -func (m *mockEventSink) Scheme() string { - return sink.BlackHoleScheme -} - func (m *mockEventSink) Close() { close(m.dead) } @@ -53,6 +49,10 @@ func (m *mockEventSink) Dead() <-chan struct{} { return m.dead } +func (m *mockEventSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false +} + // acknowledge the txn events by call the callback function. func (m *mockEventSink) acknowledge(commitTs uint64) []*eventsink.TxnCallbackableEvent { var droppedEvents []*eventsink.TxnCallbackableEvent diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 44d1f30f616..4277fb07f11 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -951,6 +951,57 @@ var doc = `{ } } }, + "/api/v2/changefeeds/{changefeed_id}/synced": { + "get": { + "description": "get the synced status of a changefeed", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "changefeed", + "v2" + ], + "summary": "Get synced status", + "parameters": [ + { + "type": "string", + "description": "changefeed_id", + "name": "changefeed_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "default", + "name": "namespace", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/v2.SyncedStatus" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + } + } + } + }, "/api/v2/health": { "get": { "description": "Check the health status of a TiCDC cluster", @@ -1209,6 +1260,36 @@ var doc = `{ } } }, + "config.CloudStorageConfig": { + "type": "object", + "properties": { + "file-cleanup-cron-spec": { + "type": "string" + }, + "file-expiration-days": { + "type": "integer" + }, + "file-size": { + "type": "integer" + }, + "flush-concurrency": { + "type": "integer" + }, + "flush-interval": { + "type": "string" + }, + "output-column-id": { + "type": "boolean" + }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, + "worker-count": { + "type": "integer" + } + } + }, "config.ColumnSelector": { "type": "object", "properties": { @@ -1254,6 +1335,10 @@ var doc = `{ "large-message-handle": { "$ref": "#/definitions/config.LargeMessageHandleConfig" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "sasl-mechanism": { "type": "string" }, @@ -1295,6 +1380,9 @@ var doc = `{ "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", "type": "integer" }, + "cloud-storage-config": { + "$ref": "#/definitions/config.CloudStorageConfig" + }, "column-selectors": { "type": "array", "items": { @@ -1308,6 +1396,7 @@ var doc = `{ "type": "string" }, "dispatchers": { + "description": "DispatchRules is only available when the downstream is MQ.", "type": "array", "items": { "$ref": "#/definitions/config.DispatchRule" @@ -1735,6 +1824,35 @@ var doc = `{ } } }, + "v2.CloudStorageConfig": { + "type": "object", + "properties": { + "file_cleanup_cron_spec": { + "type": "string" + }, + "file_expiration_days": { + "type": "integer" + }, + "file_size": { + "type": "integer" + }, + "flush_concurrency": { + "type": "integer" + }, + "flush_interval": { + "type": "string" + }, + "output_column_id": { + "type": "boolean" + }, + "output_raw_change_event": { + "type": "boolean" + }, + "worker_count": { + "type": "integer" + } + } + }, "v2.ColumnSelector": { "type": "object", "properties": { @@ -1755,9 +1873,15 @@ var doc = `{ "v2.ConsistentConfig": { "type": "object", "properties": { + "compression": { + "type": "string" + }, "encoding_worker_num": { "type": "integer" }, + "flush_concurrency": { + "type": "integer" + }, "flush_interval": { "type": "integer" }, @@ -1770,6 +1894,9 @@ var doc = `{ "max_log_size": { "type": "integer" }, + "memory_usage": { + "$ref": "#/definitions/v2.ConsistentMemoryUsage" + }, "meta_flush_interval": { "type": "integer" }, @@ -1781,6 +1908,17 @@ var doc = `{ } } }, + "v2.ConsistentMemoryUsage": { + "type": "object", + "properties": { + "event_cache_percentage": { + "type": "integer" + }, + "memory_quota_percentage": { + "type": "integer" + } + } + }, "v2.DispatchRule": { "type": "object", "properties": { @@ -1895,6 +2033,9 @@ var doc = `{ "large_message_handle": { "$ref": "#/definitions/v2.LargeMessageHandleConfig" }, + "output_raw_change_event": { + "type": "boolean" + }, "sasl_mechanism": { "type": "string" }, @@ -1980,6 +2121,9 @@ var doc = `{ "case_sensitive": { "type": "boolean" }, + "changefeed_error_stuck_duration": { + "type": "string" + }, "check_gc_safe_point": { "type": "boolean" }, @@ -1992,6 +2136,9 @@ var doc = `{ "enable_sync_point": { "type": "boolean" }, + "enable_table_monitor": { + "type": "boolean" + }, "filter": { "$ref": "#/definitions/v2.FilterConfig" }, @@ -2010,11 +2157,17 @@ var doc = `{ "sink": { "$ref": "#/definitions/v2.SinkConfig" }, + "sql_mode": { + "type": "string" + }, "sync_point_interval": { "type": "string" }, "sync_point_retention": { "type": "string" + }, + "synced_status": { + "$ref": "#/definitions/v2.SyncedStatusConfig" } } }, @@ -2067,6 +2220,9 @@ var doc = `{ "advance_timeout": { "type": "integer" }, + "cloud_storage_config": { + "$ref": "#/definitions/v2.CloudStorageConfig" + }, "column_selectors": { "type": "array", "items": { @@ -2111,6 +2267,42 @@ var doc = `{ } } }, + "v2.SyncedStatus": { + "type": "object", + "properties": { + "info": { + "type": "string" + }, + "last_synced_ts": { + "type": "string" + }, + "now_ts": { + "type": "string" + }, + "puller_resolved_ts": { + "type": "string" + }, + "sink_checkpoint_ts": { + "type": "string" + }, + "synced": { + "type": "boolean" + } + } + }, + "v2.SyncedStatusConfig": { + "type": "object", + "properties": { + "checkpoint_interval": { + "description": "The maximum interval between latest checkpoint ts and now or\nbetween latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state", + "type": "integer" + }, + "synced_check_interval": { + "description": "The minimum interval between the latest synced ts and now required to reach synced state", + "type": "integer" + } + } + }, "v2.Table": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index acb93660a2b..f3a47b29781 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -932,6 +932,57 @@ } } }, + "/api/v2/changefeeds/{changefeed_id}/synced": { + "get": { + "description": "get the synced status of a changefeed", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "changefeed", + "v2" + ], + "summary": "Get synced status", + "parameters": [ + { + "type": "string", + "description": "changefeed_id", + "name": "changefeed_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "default", + "name": "namespace", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/v2.SyncedStatus" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/model.HTTPError" + } + } + } + } + }, "/api/v2/health": { "get": { "description": "Check the health status of a TiCDC cluster", @@ -1190,6 +1241,36 @@ } } }, + "config.CloudStorageConfig": { + "type": "object", + "properties": { + "file-cleanup-cron-spec": { + "type": "string" + }, + "file-expiration-days": { + "type": "integer" + }, + "file-size": { + "type": "integer" + }, + "flush-concurrency": { + "type": "integer" + }, + "flush-interval": { + "type": "string" + }, + "output-column-id": { + "type": "boolean" + }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, + "worker-count": { + "type": "integer" + } + } + }, "config.ColumnSelector": { "type": "object", "properties": { @@ -1235,6 +1316,10 @@ "large-message-handle": { "$ref": "#/definitions/config.LargeMessageHandleConfig" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "sasl-mechanism": { "type": "string" }, @@ -1276,6 +1361,9 @@ "description": "AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been\nadvanced for this given duration, the sink will be canceled and re-established.", "type": "integer" }, + "cloud-storage-config": { + "$ref": "#/definitions/config.CloudStorageConfig" + }, "column-selectors": { "type": "array", "items": { @@ -1289,6 +1377,7 @@ "type": "string" }, "dispatchers": { + "description": "DispatchRules is only available when the downstream is MQ.", "type": "array", "items": { "$ref": "#/definitions/config.DispatchRule" @@ -1716,6 +1805,35 @@ } } }, + "v2.CloudStorageConfig": { + "type": "object", + "properties": { + "file_cleanup_cron_spec": { + "type": "string" + }, + "file_expiration_days": { + "type": "integer" + }, + "file_size": { + "type": "integer" + }, + "flush_concurrency": { + "type": "integer" + }, + "flush_interval": { + "type": "string" + }, + "output_column_id": { + "type": "boolean" + }, + "output_raw_change_event": { + "type": "boolean" + }, + "worker_count": { + "type": "integer" + } + } + }, "v2.ColumnSelector": { "type": "object", "properties": { @@ -1736,9 +1854,15 @@ "v2.ConsistentConfig": { "type": "object", "properties": { + "compression": { + "type": "string" + }, "encoding_worker_num": { "type": "integer" }, + "flush_concurrency": { + "type": "integer" + }, "flush_interval": { "type": "integer" }, @@ -1751,6 +1875,9 @@ "max_log_size": { "type": "integer" }, + "memory_usage": { + "$ref": "#/definitions/v2.ConsistentMemoryUsage" + }, "meta_flush_interval": { "type": "integer" }, @@ -1762,6 +1889,17 @@ } } }, + "v2.ConsistentMemoryUsage": { + "type": "object", + "properties": { + "event_cache_percentage": { + "type": "integer" + }, + "memory_quota_percentage": { + "type": "integer" + } + } + }, "v2.DispatchRule": { "type": "object", "properties": { @@ -1876,6 +2014,9 @@ "large_message_handle": { "$ref": "#/definitions/v2.LargeMessageHandleConfig" }, + "output_raw_change_event": { + "type": "boolean" + }, "sasl_mechanism": { "type": "string" }, @@ -1961,6 +2102,9 @@ "case_sensitive": { "type": "boolean" }, + "changefeed_error_stuck_duration": { + "type": "string" + }, "check_gc_safe_point": { "type": "boolean" }, @@ -1973,6 +2117,9 @@ "enable_sync_point": { "type": "boolean" }, + "enable_table_monitor": { + "type": "boolean" + }, "filter": { "$ref": "#/definitions/v2.FilterConfig" }, @@ -1991,11 +2138,17 @@ "sink": { "$ref": "#/definitions/v2.SinkConfig" }, + "sql_mode": { + "type": "string" + }, "sync_point_interval": { "type": "string" }, "sync_point_retention": { "type": "string" + }, + "synced_status": { + "$ref": "#/definitions/v2.SyncedStatusConfig" } } }, @@ -2048,6 +2201,9 @@ "advance_timeout": { "type": "integer" }, + "cloud_storage_config": { + "$ref": "#/definitions/v2.CloudStorageConfig" + }, "column_selectors": { "type": "array", "items": { @@ -2092,6 +2248,42 @@ } } }, + "v2.SyncedStatus": { + "type": "object", + "properties": { + "info": { + "type": "string" + }, + "last_synced_ts": { + "type": "string" + }, + "now_ts": { + "type": "string" + }, + "puller_resolved_ts": { + "type": "string" + }, + "sink_checkpoint_ts": { + "type": "string" + }, + "synced": { + "type": "boolean" + } + } + }, + "v2.SyncedStatusConfig": { + "type": "object", + "properties": { + "checkpoint_interval": { + "description": "The maximum interval between latest checkpoint ts and now or\nbetween latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state", + "type": "integer" + }, + "synced_check_interval": { + "description": "The minimum interval between the latest synced ts and now required to reach synced state", + "type": "integer" + } + } + }, "v2.Table": { "type": "object", "properties": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 3820d2165d5..3716bac40dc 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -17,6 +17,27 @@ definitions: description: quoting character type: string type: object + config.CloudStorageConfig: + properties: + file-cleanup-cron-spec: + type: string + file-expiration-days: + type: integer + file-size: + type: integer + flush-concurrency: + type: integer + flush-interval: + type: string + output-column-id: + type: boolean + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean + worker-count: + type: integer + type: object config.ColumnSelector: properties: columns: @@ -49,6 +70,10 @@ definitions: properties: large-message-handle: $ref: '#/definitions/config.LargeMessageHandleConfig' + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean sasl-mechanism: type: string sasl-oauth-audience: @@ -78,6 +103,8 @@ definitions: AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been advanced for this given duration, the sink will be canceled and re-established. type: integer + cloud-storage-config: + $ref: '#/definitions/config.CloudStorageConfig' column-selectors: items: $ref: '#/definitions/config.ColumnSelector' @@ -87,6 +114,7 @@ definitions: date-separator: type: string dispatchers: + description: DispatchRules is only available when the downstream is MQ. items: $ref: '#/definitions/config.DispatchRule' type: array @@ -373,6 +401,25 @@ definitions: upstream_id: type: integer type: object + v2.CloudStorageConfig: + properties: + file_cleanup_cron_spec: + type: string + file_expiration_days: + type: integer + file_size: + type: integer + flush_concurrency: + type: integer + flush_interval: + type: string + output_column_id: + type: boolean + output_raw_change_event: + type: boolean + worker_count: + type: integer + type: object v2.ColumnSelector: properties: columns: @@ -386,8 +433,12 @@ definitions: type: object v2.ConsistentConfig: properties: + compression: + type: string encoding_worker_num: type: integer + flush_concurrency: + type: integer flush_interval: type: integer flush_worker_num: @@ -396,6 +447,8 @@ definitions: type: string max_log_size: type: integer + memory_usage: + $ref: '#/definitions/v2.ConsistentMemoryUsage' meta_flush_interval: type: integer storage: @@ -403,6 +456,13 @@ definitions: use_file_backend: type: boolean type: object + v2.ConsistentMemoryUsage: + properties: + event_cache_percentage: + type: integer + memory_quota_percentage: + type: integer + type: object v2.DispatchRule: properties: matcher: @@ -480,6 +540,8 @@ definitions: properties: large_message_handle: $ref: '#/definitions/v2.LargeMessageHandleConfig' + output_raw_change_event: + type: boolean sasl_mechanism: type: string sasl_oauth_audience: @@ -535,6 +597,8 @@ definitions: type: boolean case_sensitive: type: boolean + changefeed_error_stuck_duration: + type: string check_gc_safe_point: type: boolean consistent: @@ -543,6 +607,8 @@ definitions: type: boolean enable_sync_point: type: boolean + enable_table_monitor: + type: boolean filter: $ref: '#/definitions/v2.FilterConfig' force_replicate: @@ -555,10 +621,14 @@ definitions: $ref: '#/definitions/v2.MounterConfig' sink: $ref: '#/definitions/v2.SinkConfig' + sql_mode: + type: string sync_point_interval: type: string sync_point_retention: type: string + synced_status: + $ref: '#/definitions/v2.SyncedStatusConfig' type: object v2.RunningError: properties: @@ -592,6 +662,8 @@ definitions: properties: advance_timeout: type: integer + cloud_storage_config: + $ref: '#/definitions/v2.CloudStorageConfig' column_selectors: items: $ref: '#/definitions/v2.ColumnSelector' @@ -621,6 +693,33 @@ definitions: transaction_atomicity: type: string type: object + v2.SyncedStatus: + properties: + info: + type: string + last_synced_ts: + type: string + now_ts: + type: string + puller_resolved_ts: + type: string + sink_checkpoint_ts: + type: string + synced: + type: boolean + type: object + v2.SyncedStatusConfig: + properties: + checkpoint_interval: + description: |- + The maximum interval between latest checkpoint ts and now or + between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state + type: integer + synced_check_interval: + description: The minimum interval between the latest synced ts and now required + to reach synced state + type: integer + type: object v2.Table: properties: database_name: @@ -1244,6 +1343,40 @@ paths: tags: - changefeed - v2 + /api/v2/changefeeds/{changefeed_id}/synced: + get: + consumes: + - application/json + description: get the synced status of a changefeed + parameters: + - description: changefeed_id + in: path + name: changefeed_id + required: true + type: string + - description: default + in: query + name: namespace + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/v2.SyncedStatus' + "400": + description: Bad Request + schema: + $ref: '#/definitions/model.HTTPError' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/model.HTTPError' + summary: Get synced status + tags: + - changefeed + - v2 /api/v2/health: get: description: Check the health status of a TiCDC cluster diff --git a/errors.toml b/errors.toml index b386a96bf8a..996e76f22bc 100755 --- a/errors.toml +++ b/errors.toml @@ -1071,6 +1071,11 @@ error = ''' service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint ''' +["CDC:ErrSinkIncompatibleConfig"] +error = ''' +incompatible configuration %s +''' + ["CDC:ErrSinkInvalidConfig"] error = ''' sink config invalid diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 9f73988dd32..bfcc7dc0a0b 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -121,6 +121,7 @@ type SinkConfig struct { TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"` Protocol string `toml:"protocol" json:"protocol"` + // DispatchRules is only available when the downstream is MQ. DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"` CSVConfig *CSVConfig `toml:"csv" json:"csv"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` @@ -155,6 +156,16 @@ type KafkaConfig struct { SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` +} + +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (k *KafkaConfig) GetOutputRawChangeEvent() bool { + if k == nil || k.OutputRawChangeEvent == nil { + return false + } + return *k.OutputRawChangeEvent } // MaskSensitiveData masks sensitive data in SinkConfig @@ -403,21 +414,41 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { return err } - // Validate that protocol is compatible with the scheme. For testing purposes, - // any protocol should be legal for blackhole. - if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { - _, err := ParseSinkProtocolFromString(s.Protocol) - if err != nil { - return err - } - } else if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != "" { + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) + + // Check that protocol config is compatible with the scheme. + if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != "" { return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol %s "+ "is incompatible with %s scheme", s.Protocol, sinkURI.Scheme)) } + // For testing purposes, any protocol should be legal for blackhole. + if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { + return s.ValidateProtocol(sinkURI.Scheme) + } + return nil +} - log.Info("succeed to parse parameter from sink uri", - zap.String("protocol", s.Protocol), - zap.String("txnAtomicity", string(s.TxnAtomicity))) +// ValidateProtocol validates the protocol configuration. +func (s *SinkConfig) ValidateProtocol(scheme string) error { + protocol, err := ParseSinkProtocolFromString(s.Protocol) + if err != nil { + return err + } + + outputRawChangeEvent := false + switch scheme { + case sink.KafkaScheme, sink.KafkaSSLScheme: + outputRawChangeEvent = s.KafkaConfig.GetOutputRawChangeEvent() + default: + outputRawChangeEvent = s.CloudStorageConfig.GetOutputRawChangeEvent() + } + + if outputRawChangeEvent { + // TODO: return error if we do not need to keep backward compatibility. + log.Warn(fmt.Sprintf("TiCDC will not split the update pk/uk events if output-raw-change-event is true(scheme: %s, protocol: %s).", scheme, protocol)) + } return nil } @@ -570,4 +601,15 @@ type CloudStorageConfig struct { OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` +} + +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool { + if c == nil || c.OutputRawChangeEvent == nil { + return false + } + return *c.OutputRawChangeEvent } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 3a633f2da68..dbd25e03813 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -412,6 +412,10 @@ var ( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), ) + ErrSinkIncompatibleConfig = errors.Normalize( + "incompatible configuration %s", + errors.RFCCodeText("CDC:ErrSinkIncompatibleConfig"), + ) ErrCraftCodecInvalidData = errors.Normalize( "craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData"), diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml new file mode 100644 index 00000000000..67f590af9bb --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml @@ -0,0 +1,25 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml new file mode 100644 index 00000000000..b806252b395 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml @@ -0,0 +1,24 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml new file mode 100644 index 00000000000..8edf2368fa4 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_clustered/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql new file mode 100644 index 00000000000..506a6e75765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql @@ -0,0 +1,27 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY CLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY CLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..ad6016c8059 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res @@ -0,0 +1,22 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +"U","update_pk","test",450253245485940753,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..ebe3a635252 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,1,1,"example1" +"D","update_uk","test",450253245499047940,2,2,"example2" +"I","update_uk","test",450253245499047940,1,2,"example1" +"I","update_uk","test",450253245499047940,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,10,10,"example10" +"D","update_uk","test",450253245499047943,20,20,"example20" +"I","update_uk","test",450253245499047943,10,30,"example10" +"I","update_uk","test",450253245499047943,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,100,100,"example100" +"I","update_uk","test",450253245499047946,100,200,"example100" + +"U","update_uk","test",450253245512155140,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..5e7f2ce0e71 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450253245499047940,true,1,1,"example1" +"I","update_uk","test",450253245499047940,true,1,2,"example1" +"D","update_uk","test",450253245499047940,true,2,2,"example2" +"I","update_uk","test",450253245499047940,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450253245499047943,true,10,10,"example10" +"I","update_uk","test",450253245499047943,true,10,30,"example10" +"D","update_uk","test",450253245499047943,true,20,20,"example20" +"I","update_uk","test",450253245499047943,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450253245499047946,true,100,100,"example100" +"I","update_uk","test",450253245499047946,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh new file mode 100644 index 00000000000..65842e07025 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 5 + run_changefeed "changefeed2" $start_ts 5 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml new file mode 100644 index 00000000000..67f590af9bb --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml @@ -0,0 +1,25 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml new file mode 100644 index 00000000000..b806252b395 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml @@ -0,0 +1,24 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml new file mode 100644 index 00000000000..0714c0c18d9 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_nonclustered/sync_diff-/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql new file mode 100644 index 00000000000..f3cd4ca4d24 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql @@ -0,0 +1,28 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..08f6eedb804 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,1,"example1" +"D","update_pk","test",450250823807270922,2,"example2" +"I","update_pk","test",450250823807270922,2,"example1" +"I","update_pk","test",450250823807270922,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,10,"example10" +"D","update_pk","test",450250823807270925,20,"example20" +"I","update_pk","test",450250823807270925,30,"example10" +"I","update_pk","test",450250823807270925,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,100,"example100" +"I","update_pk","test",450250823807270927,200,"example100" + +"U","update_pk","test",450250823807270928,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..b26f2219af2 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,1,1,"example1" +"D","update_uk","test",450250823807270931,2,2,"example2" +"I","update_uk","test",450250823807270931,1,2,"example1" +"I","update_uk","test",450250823807270931,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,10,10,"example10" +"D","update_uk","test",450250823820115970,20,20,"example20" +"I","update_uk","test",450250823820115970,10,30,"example10" +"I","update_uk","test",450250823820115970,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,100,100,"example100" +"I","update_uk","test",450250823820115973,100,200,"example100" + +"U","update_uk","test",450250823820115977,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..e2713a94f63 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res @@ -0,0 +1,28 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split in csv encoder +# DIFF_RES: REPLACE INTO `test`.`update_pk`(`id`,`pad`) VALUES (2,'example1'); +# lost id=2 since delete are not sorted before insert within single txn +"D","update_pk","test",450250823807270922,true,1,"example1" +"I","update_pk","test",450250823807270922,true,2,"example1" +"D","update_pk","test",450250823807270922,true,2,"example2" +"I","update_pk","test",450250823807270922,true,1,"example2" + +# split in csv encoder +"D","update_pk","test",450250823807270925,true,10,"example10" +"I","update_pk","test",450250823807270925,true,30,"example10" +"D","update_pk","test",450250823807270925,true,20,"example20" +"I","update_pk","test",450250823807270925,true,40,"example20" + +# split in csv encoder +"D","update_pk","test",450250823807270927,true,100,"example100" +"I","update_pk","test",450250823807270927,true,200,"example100" + +# normal update event, also split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..1783ee5a0dd --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450250823807270931,true,1,1,"example1" +"I","update_uk","test",450250823807270931,true,1,2,"example1" +"D","update_uk","test",450250823807270931,true,2,2,"example2" +"I","update_uk","test",450250823807270931,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450250823820115970,true,10,10,"example10" +"I","update_uk","test",450250823820115970,true,10,30,"example10" +"D","update_uk","test",450250823820115970,true,20,20,"example20" +"I","update_uk","test",450250823820115970,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450250823820115973,true,100,100,"example100" +"I","update_uk","test",450250823820115973,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh new file mode 100644 index 00000000000..4a6b1b73fe2 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + local should_pass_check=$4 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + + cp $CUR/conf/diff_config.toml $WORK_DIR/diff_config.toml + sed -i "s//$changefeed_id/" $WORK_DIR/diff_config.toml + if [[ $should_pass_check == true ]]; then + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 100 + else + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" + fi + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 10 true + # changefeed2 fail since delete events are not sorted + run_changefeed "changefeed2" $start_ts 10 false +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 04f01885f3d..c79b0f15f5a 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -18,7 +18,7 @@ kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic canal_json_content_compatible multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" -storage_only="lossy_ddl storage_csv_update" +storage_only="lossy_ddl storage_csv_update csv_storage_update_pk_clustered csv_storage_update_pk_nonclustered" storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table"