diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index b025b779263..21ca4a7c53d 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -189,32 +189,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( if err != nil { return nil, err } - if !replicaCfg.EnableOldValue { - sinkURIParsed, err := url.Parse(cfg.SinkURI) - if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - protocol := sinkURIParsed.Query().Get(config.ProtocolKey) - if protocol != "" { - replicaCfg.Sink.Protocol = protocol - } - for _, fp := range config.ForceEnableOldValueProtocols { - if replicaCfg.Sink.Protocol == fp { - log.Warn( - "Attempting to replicate without old value enabled. "+ - "CDC will enable old value and continue.", - zap.String("protocol", replicaCfg.Sink.Protocol)) - replicaCfg.EnableOldValue = true - break - } - } - - if replicaCfg.ForceReplicate { - return nil, cerror.ErrOldValueNotEnabled.GenWithStackByArgs( - "if use force replicate, old value feature must be enabled") - } - } f, err := filter.NewFilter(replicaCfg, "") if err != nil { return nil, errors.Cause(err) diff --git a/cdc/api/v2/api_helpers_test.go b/cdc/api/v2/api_helpers_test.go index 16889165dfc..1201e528e00 100644 --- a/cdc/api/v2/api_helpers_test.go +++ b/cdc/api/v2/api_helpers_test.go @@ -45,12 +45,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { cfg.ReplicaConfig = GetDefaultReplicaConfig() cfg.ReplicaConfig.ForceReplicate = true cfg.ReplicaConfig.EnableOldValue = false - // disable old value but force replicate + cfg.SinkURI = "mysql://" + // disable old value but force replicate, and using mysql sink. cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) require.NotNil(t, err) require.Nil(t, cfInfo) cfg.ReplicaConfig.ForceReplicate = false cfg.ReplicaConfig.IgnoreIneligibleTable = true + cfg.SinkURI = "blackhole://" cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) require.Nil(t, err) require.NotNil(t, cfInfo) @@ -88,6 +90,19 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { cfg.SinkURI = string([]byte{0x7f, ' '}) cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) require.NotNil(t, err) + + cfg.StartTs = 0 + // use blackhole to workaround + cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro" + cfg.ReplicaConfig.EnableOldValue = true + cfg.ReplicaConfig.ForceReplicate = false + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + require.NoError(t, err) + require.False(t, cfInfo.Config.EnableOldValue) + + cfg.ReplicaConfig.ForceReplicate = true + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + require.Error(t, cerror.ErrOldValueNotEnabled, err) } func TestVerifyUpdateChangefeedConfig(t *testing.T) { @@ -140,4 +155,16 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { cfg.TargetTs = 9 newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.NotNil(t, err) + + cfg.StartTs = 0 + cfg.TargetTs = 0 + cfg.ReplicaConfig.EnableOldValue = true + cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro" + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) + require.NoError(t, err) + require.False(t, newCfInfo.Config.EnableOldValue) + + cfg.ReplicaConfig.ForceReplicate = true + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) + require.Error(t, cerror.ErrOldValueNotEnabled, err) } diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index a01700b279b..28edd19186c 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -71,7 +71,6 @@ type Mounter interface { type mounter struct { schemaStorage SchemaStorage tz *time.Location - enableOldValue bool changefeedID model.ChangeFeedID filter pfilter.Filter metricTotalRows prometheus.Gauge @@ -83,13 +82,11 @@ func NewMounter(schemaStorage SchemaStorage, changefeedID model.ChangeFeedID, tz *time.Location, filter pfilter.Filter, - enableOldValue bool, ) Mounter { return &mounter{ - schemaStorage: schemaStorage, - changefeedID: changefeedID, - enableOldValue: enableOldValue, - filter: filter, + schemaStorage: schemaStorage, + changefeedID: changefeedID, + filter: filter, metricTotalRows: totalRowsCountGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricIgnoredDMLEventCounter: ignoredDMLEventCounter. @@ -271,7 +268,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { } func datum2Column( - tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool, + tableInfo *model.TableInfo, datums map[int64]types.Datum, ) ([]*model.Column, []types.Datum, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) @@ -288,19 +285,18 @@ func datum2Column( continue } colName := colInfo.Name.O - colDatums, exist := datums[colInfo.ID] - var colValue interface{} - if !exist && !fillWithDefaultValue { - log.Debug("column value is not found", - zap.String("table", tableInfo.Name.O), zap.String("column", colName)) - continue - } - var err error - var warn string - var size int + colID := colInfo.ID + colDatums, exist := datums[colID] + + var ( + colValue interface{} + size int + warn string + err error + ) if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) - } else if fillWithDefaultValue { + } else { colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo) } if err != nil { @@ -342,27 +338,16 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) + preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow) if err != nil { return nil, rawRow, errors.Trace(err) } - - // NOTICE: When the old Value feature is off, - // the Delete event only needs to keep the handle key column. - if row.Delete && !m.enableOldValue { - for i := range preCols { - col := preCols[i] - if col != nil && !col.Flag.IsHandleKey() { - preCols[i] = nil - } - } - } } var cols []*model.Column var rawCols []types.Datum if row.RowExist { - cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true) + cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row) if err != nil { return nil, rawRow, errors.Trace(err) } diff --git a/cdc/entry/mounter_group.go b/cdc/entry/mounter_group.go index 770267baa55..41c21680f69 100644 --- a/cdc/entry/mounter_group.go +++ b/cdc/entry/mounter_group.go @@ -31,13 +31,11 @@ type MounterGroup interface { } type mounterGroup struct { - schemaStorage SchemaStorage - inputCh chan *model.PolymorphicEvent - tz *time.Location - filter filter.Filter - enableOldValue bool - - workerNum int + schemaStorage SchemaStorage + inputCh chan *model.PolymorphicEvent + tz *time.Location + filter filter.Filter + workerNum int changefeedID model.ChangeFeedID } @@ -52,7 +50,6 @@ const ( func NewMounterGroup( schemaStorage SchemaStorage, workerNum int, - enableOldValue bool, filter filter.Filter, tz *time.Location, changefeedID model.ChangeFeedID, @@ -61,11 +58,10 @@ func NewMounterGroup( workerNum = defaultMounterWorkerNum } return &mounterGroup{ - schemaStorage: schemaStorage, - inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize), - enableOldValue: enableOldValue, - filter: filter, - tz: tz, + schemaStorage: schemaStorage, + inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize), + filter: filter, + tz: tz, workerNum: workerNum, @@ -100,7 +96,7 @@ func (m *mounterGroup) Run(ctx context.Context) error { } func (m *mounterGroup) runWorker(ctx context.Context) error { - mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.enableOldValue) + mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter) for { select { case <-ctx.Done(): diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 4767eeb681d..cee85e64e1f 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -308,9 +308,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct { config := config.GetDefaultReplicaConfig() filter, err := filter.NewFilter(config, "") require.Nil(t, err) - mounter := NewMounter(scheamStorage, - model.DefaultChangeFeedID("c1"), - time.UTC, filter, false).(*mounter) + mounter := NewMounter(scheamStorage, model.DefaultChangeFeedID("c1"), time.UTC, filter).(*mounter) mounter.tz = time.Local ctx := context.Background() @@ -1019,7 +1017,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) { ts := schemaStorage.GetLastSnapshot().CurrentTs() schemaStorage.AdvanceResolvedTs(ver.Ver) - mounter := NewMounter(schemaStorage, cfID, time.Local, f, true).(*mounter) + mounter := NewMounter(schemaStorage, cfID, time.Local, f).(*mounter) type testCase struct { schema string @@ -1196,7 +1194,7 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) - cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) + cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index a3e1b7a04ff..ee3033edf57 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -18,6 +18,7 @@ import ( "math" "net/url" "regexp" + "strings" "time" "github.com/pingcap/errors" @@ -261,7 +262,7 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) { // VerifyAndComplete verifies changefeed info and may fill in some fields. // If a required field is not provided, return an error. // If some necessary filed is missing but can use a default value, fill in it. -func (info *ChangeFeedInfo) VerifyAndComplete() error { +func (info *ChangeFeedInfo) VerifyAndComplete() { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { info.Engine = SortUnified @@ -278,8 +279,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error { if info.Config.Consistent == nil { info.Config.Consistent = defaultConfig.Consistent } - - return nil } // FixIncompatible fixes incompatible changefeed meta info. @@ -308,6 +307,14 @@ func (info *ChangeFeedInfo) FixIncompatible() { info.fixMemoryQuota() log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String())) } + + if creatorVersionGate.ChangefeedAdjustEnableOldValueByProtocol() { + log.Info("Start fixing incompatible enable old value", zap.String("changefeed", info.String()), + zap.Bool("enableOldValue", info.Config.EnableOldValue)) + info.fixEnableOldValue() + log.Info("Fix incompatible enable old value completed", zap.String("changefeed", info.String()), + zap.Bool("enableOldValue", info.Config.EnableOldValue)) + } } // fixState attempts to fix state loss from upgrading the old owner to the new owner. @@ -378,6 +385,18 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() { } } +func (info *ChangeFeedInfo) fixEnableOldValue() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + // this is impossible to happen, since the changefeed registered successfully. + log.Warn("parse sink URI failed", zap.Error(err)) + return + } + scheme := strings.ToLower(uri.Scheme) + protocol := uri.Query().Get(config.ProtocolKey) + info.Config.AdjustEnableOldValue(scheme, protocol) +} + func (info *ChangeFeedInfo) fixMQSinkProtocol() { uri, err := url.Parse(info.SinkURI) if err != nil { diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 2f276a35a03..a2b2f5923ce 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -154,8 +154,7 @@ func TestVerifyAndComplete(t *testing.T) { }, } - err := info.VerifyAndComplete() - require.Nil(t, err) + info.VerifyAndComplete() require.Equal(t, SortUnified, info.Engine) marshalConfig1, err := info.Config.Marshal() diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index b0db75213f2..2241583fdf9 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -302,15 +302,7 @@ func SplitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv deleteEventRowKV := *updateEvent.RawKV deleteEvent.Row = &deleteEventRow deleteEvent.RawKV = &deleteEventRowKV - deleteEvent.Row.Columns = nil - for i := range deleteEvent.Row.PreColumns { - // NOTICE: Only the handle key pre column is retained in the delete event. - if deleteEvent.Row.PreColumns[i] != nil && - !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { - deleteEvent.Row.PreColumns[i] = nil - } - } insertEvent := *updateEvent insertEventRow := *updateEvent.Row diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 592530c3589..911465c6f2f 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -584,14 +584,6 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { require.Len(t, sink.Received[deleteEventIndex].Row.PreColumns, 3) nilColIndex := 0 require.Nil(t, sink.Received[deleteEventIndex].Row.PreColumns[nilColIndex]) - nonHandleKeyColIndex := 1 - handleKeyColIndex := 2 - // NOTICE: When old value disabled, we only keep the handle key pre cols. - require.Nil(t, sink.Received[deleteEventIndex].Row.PreColumns[nonHandleKeyColIndex]) - require.Equal(t, "col2", sink.Received[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Name) - require.True(t, - sink.Received[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Flag.IsHandleKey(), - ) insertEventIndex := 1 require.Len(t, sink.Received[insertEventIndex].Row.Columns, 3) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index ca961c85f51..8289c0579f1 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -789,7 +789,6 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { p.mg = entry.NewMounterGroup(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, - p.changefeed.Info.Config.EnableOldValue, p.filter, tz, p.changefeedID) p.wg.Add(1) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 1ea4e720510..3472d617755 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -443,7 +443,7 @@ func convertRowChangedEvents( // This indicates that it is an update event, // and after enable old value internally by default(but disable in the configuration). // We need to handle the update event to be compatible with the old format. - if !enableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen { + if e.Row.IsUpdate() && !enableOldValue { if shouldSplitUpdateEvent(e) { deleteEvent, insertEvent, err := splitUpdateEvent(e) if err != nil { @@ -508,13 +508,6 @@ func splitUpdateEvent( deleteEvent.RawKV = &deleteEventRowKV deleteEvent.Row.Columns = nil - for i := range deleteEvent.Row.PreColumns { - // NOTICE: Only the handle key pre column is retained in the delete event. - if deleteEvent.Row.PreColumns[i] != nil && - !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { - deleteEvent.Row.PreColumns[i] = nil - } - } insertEvent := *updateEvent insertEventRow := *updateEvent.Row diff --git a/cdc/sink/codec/builder/codec_test.go b/cdc/sink/codec/builder/codec_test.go index 1e43f2edc92..c01fee4195e 100644 --- a/cdc/sink/codec/builder/codec_test.go +++ b/cdc/sink/codec/builder/codec_test.go @@ -69,14 +69,14 @@ func TestJsonVsCraftVsPB(t *testing.T) { if len(cs) == 0 { continue } - craftEncoder := craft.NewBatchEncoder() - craftEncoder.(*craft.BatchEncoder).MaxMessageBytes = 8192 - craftEncoder.(*craft.BatchEncoder).MaxBatchSize = 64 + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } + craftEncoder := craft.NewBatchEncoder(config) craftMessages := encodeRowCase(t, craftEncoder, cs) - jsonEncoder := open.NewBatchEncoder() - jsonEncoder.(*open.BatchEncoder).MaxMessageBytes = 8192 - jsonEncoder.(*open.BatchEncoder).MaxBatchSize = 64 + jsonEncoder := open.NewBatchEncoder(config) jsonMessages := encodeRowCase(t, jsonEncoder, cs) protobuf1Messages := codecEncodeRowChangedPB1ToMessage(cs) @@ -220,16 +220,17 @@ func codecEncodeRowCase(encoder codec.EventBatchEncoder, events []*model.RowChan func init() { var err error - encoder := craft.NewBatchEncoder() - encoder.(*craft.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*craft.BatchEncoder).MaxBatchSize = 64 + + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } + encoder := craft.NewBatchEncoder(config) if codecCraftEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil { panic(err) } - encoder = open.NewBatchEncoder() - encoder.(*open.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*open.BatchEncoder).MaxBatchSize = 64 + encoder = open.NewBatchEncoder(config) if codecJSONEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil { panic(err) } @@ -238,19 +239,23 @@ func init() { } func BenchmarkCraftEncoding(b *testing.B) { + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } allocator := craft.NewSliceAllocator(128) - encoder := craft.NewBatchEncoderWithAllocator(allocator) - encoder.(*craft.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*craft.BatchEncoder).MaxBatchSize = 64 + encoder := craft.NewBatchEncoderWithAllocator(allocator, config) for i := 0; i < b.N; i++ { _, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges) } } func BenchmarkJsonEncoding(b *testing.B) { - encoder := open.NewBatchEncoder() - encoder.(*open.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*open.BatchEncoder).MaxBatchSize = 64 + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } + encoder := open.NewBatchEncoder(config) for i := 0; i < b.N; i++ { _, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges) } diff --git a/cdc/sink/codec/builder/encoder_builder.go b/cdc/sink/codec/builder/encoder_builder.go index 4b398ba4c2c..43f055d2593 100644 --- a/cdc/sink/codec/builder/encoder_builder.go +++ b/cdc/sink/codec/builder/encoder_builder.go @@ -34,11 +34,11 @@ func NewEventBatchEncoderBuilder(ctx context.Context, c *common.Config) (codec.E case config.ProtocolDefault, config.ProtocolOpen: return open.NewBatchEncoderBuilder(c), nil case config.ProtocolCanal: - return canal.NewBatchEncoderBuilder(), nil + return canal.NewBatchEncoderBuilder(c), nil case config.ProtocolAvro: return avro.NewBatchEncoderBuilder(ctx, c) case config.ProtocolMaxwell: - return maxwell.NewBatchEncoderBuilder(), nil + return maxwell.NewBatchEncoderBuilder(c), nil case config.ProtocolCanalJSON: return canal.NewJSONBatchEncoderBuilder(c), nil case config.ProtocolCraft: diff --git a/cdc/sink/codec/canal/canal_encoder.go b/cdc/sink/codec/canal/canal_encoder.go index 4bff7343cda..8d3b28ea988 100644 --- a/cdc/sink/codec/canal/canal_encoder.go +++ b/cdc/sink/codec/canal/canal_encoder.go @@ -34,6 +34,8 @@ type BatchEncoder struct { callbackBuf []func() packet *canal.Packet entryBuilder *canalEntryBuilder + + config *common.Config } // EncodeCheckpointEvent implements the EventBatchEncoder interface @@ -50,7 +52,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - entry, err := d.entryBuilder.fromRowEvent(e) + entry, err := d.entryBuilder.fromRowEvent(e, d.config.OnlyHandleKeyColumns) if err != nil { return errors.Trace(err) } @@ -156,25 +158,31 @@ func (d *BatchEncoder) resetPacket() { } // newBatchEncoder creates a new canalBatchEncoder. -func newBatchEncoder() codec.EventBatchEncoder { +func newBatchEncoder(config *common.Config) codec.EventBatchEncoder { encoder := &BatchEncoder{ messages: &canal.Messages{}, callbackBuf: make([]func(), 0), entryBuilder: newCanalEntryBuilder(), + + config: config, } encoder.resetPacket() return encoder } -type batchEncoderBuilder struct{} +type batchEncoderBuilder struct { + config *common.Config +} // Build a `canalBatchEncoder` func (b *batchEncoderBuilder) Build() codec.EventBatchEncoder { - return newBatchEncoder() + return newBatchEncoder(b.config) } // NewBatchEncoderBuilder creates a canal batchEncoderBuilder. -func NewBatchEncoderBuilder() codec.EncoderBuilder { - return &batchEncoderBuilder{} +func NewBatchEncoderBuilder(config *common.Config) codec.EncoderBuilder { + return &batchEncoderBuilder{ + config: config, + } } diff --git a/cdc/sink/codec/canal/canal_encoder_test.go b/cdc/sink/codec/canal/canal_encoder_test.go index 481c781993f..b07cee9eeb0 100644 --- a/cdc/sink/codec/canal/canal_encoder_test.go +++ b/cdc/sink/codec/canal/canal_encoder_test.go @@ -20,6 +20,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/codec/common" + "github.com/pingcap/tiflow/pkg/config" canal "github.com/pingcap/tiflow/proto/canal" "github.com/stretchr/testify/require" ) @@ -28,7 +30,7 @@ func TestCanalBatchEncoder(t *testing.T) { t.Parallel() s := defaultCanalBatchTester for _, cs := range s.rowCases { - encoder := newBatchEncoder() + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, row := range cs { err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) @@ -55,7 +57,7 @@ func TestCanalBatchEncoder(t *testing.T) { } for _, cs := range s.ddlCases { - encoder := newBatchEncoder() + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) require.Nil(t, err) @@ -76,7 +78,7 @@ func TestCanalBatchEncoder(t *testing.T) { } func TestCanalAppendRowChangedEventWithCallback(t *testing.T) { - encoder := newBatchEncoder() + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) require.NotNil(t, encoder) count := 0 diff --git a/cdc/sink/codec/canal/canal_entry.go b/cdc/sink/codec/canal/canal_entry.go index 4c47f66e438..e99762f2495 100644 --- a/cdc/sink/codec/canal/canal_entry.go +++ b/cdc/sink/codec/canal/canal_entry.go @@ -144,7 +144,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated } // build the RowData of a canal entry -func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowData, error) { +func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.RowData, error) { var columns []*canal.Column for _, column := range e.Columns { if column == nil { @@ -156,11 +156,16 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa } columns = append(columns, c) } + + onlyHandleKeyColumns = onlyHandleKeyColumns && e.IsDelete() var preColumns []*canal.Column for _, column := range e.PreColumns { if column == nil { continue } + if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { + continue + } c, err := b.buildColumn(column, column.Name, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) @@ -175,11 +180,11 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa } // fromRowEvent builds canal entry from cdc RowChangedEvent -func (b *canalEntryBuilder) fromRowEvent(e *model.RowChangedEvent) (*canal.Entry, error) { +func (b *canalEntryBuilder) fromRowEvent(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.Entry, error) { eventType := convertRowEventType(e) header := b.buildHeader(e.CommitTs, e.Table.Schema, e.Table.Table, eventType, 1) isDdl := isCanalDDL(eventType) // false - rowData, err := b.buildRowData(e) + rowData, err := b.buildRowData(e, onlyHandleKeyColumns) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/codec/canal/canal_entry_test.go b/cdc/sink/codec/canal/canal_entry_test.go index 77447bfda47..14ec07087cd 100644 --- a/cdc/sink/codec/canal/canal_entry_test.go +++ b/cdc/sink/codec/canal/canal_entry_test.go @@ -70,7 +70,7 @@ func testInsert(t *testing.T) { } builder := newCanalEntryBuilder() - entry, err := builder.fromRowEvent(testCaseInsert) + entry, err := builder.fromRowEvent(testCaseInsert, false) require.Nil(t, err) require.Equal(t, canal.EntryType_ROWDATA, entry.GetEntryType()) header := entry.GetHeader() @@ -146,7 +146,7 @@ func testUpdate(t *testing.T) { }, } builder := newCanalEntryBuilder() - entry, err := builder.fromRowEvent(testCaseUpdate) + entry, err := builder.fromRowEvent(testCaseUpdate, false) require.Nil(t, err) require.Equal(t, canal.EntryType_ROWDATA, entry.GetEntryType()) @@ -219,7 +219,7 @@ func testDelete(t *testing.T) { } builder := newCanalEntryBuilder() - entry, err := builder.fromRowEvent(testCaseDelete) + entry, err := builder.fromRowEvent(testCaseDelete, false) require.Nil(t, err) require.Equal(t, canal.EntryType_ROWDATA, entry.GetEntryType()) header := entry.GetHeader() diff --git a/cdc/sink/codec/canal/canal_json_decoder_test.go b/cdc/sink/codec/canal/canal_json_decoder_test.go index edcbc8c093b..017170fb7e7 100644 --- a/cdc/sink/codec/canal/canal_json_decoder_test.go +++ b/cdc/sink/codec/canal/canal_json_decoder_test.go @@ -84,7 +84,10 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { t.Parallel() for _, encodeEnable := range []bool{false, true} { - encoder := &JSONBatchEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: encodeEnable} + encoder := &JSONBatchEncoder{ + builder: newCanalEntryBuilder(), + config: &common.Config{EnableTiDBExtension: encodeEnable}, + } require.NotNil(t, encoder) result, err := encoder.EncodeDDLEvent(testCaseDDL) diff --git a/cdc/sink/codec/canal/canal_json_encoder.go b/cdc/sink/codec/canal/canal_json_encoder.go index 3e2fa5a540c..f730b813035 100644 --- a/cdc/sink/codec/canal/canal_json_encoder.go +++ b/cdc/sink/codec/canal/canal_json_encoder.go @@ -31,25 +31,29 @@ import ( // JSONBatchEncoder encodes Canal json messages in JSON format type JSONBatchEncoder struct { - builder *canalEntryBuilder + builder *canalEntryBuilder + messages []*common.Message - // When it is true, canal-json would generate TiDB extension information - // which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields. - enableTiDBExtension bool - // the symbol separating two lines - terminator []byte - maxMessageBytes int - messages []*common.Message + config *common.Config +} + +// newJSONRowEventEncoder creates a new JSONRowEventEncoder +func newJSONRowEventEncoder(config *common.Config) codec.EventBatchEncoder { + encoder := &JSONBatchEncoder{ + builder: newCanalEntryBuilder(), + messages: make([]*common.Message, 0, 1), + + config: config, + } + return encoder } // newJSONBatchEncoder creates a new JSONBatchEncoder func newJSONBatchEncoder(config *common.Config) codec.EventBatchEncoder { encoder := &JSONBatchEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: config.EnableTiDBExtension, - messages: make([]*common.Message, 0, 1), - terminator: []byte(config.Terminator), - maxMessageBytes: config.MaxMessageBytes, + builder: newCanalEntryBuilder(), + messages: make([]*common.Message, 0, 1), + config: config, } return encoder } @@ -58,7 +62,7 @@ func (c *JSONBatchEncoder) newJSONMessageForDML(e *model.RowChangedEvent) ([]byt isDelete := e.IsDelete() mysqlTypeMap := make(map[string]string, len(e.Columns)) - filling := func(columns []*model.Column, out *jwriter.Writer) error { + filling := func(columns []*model.Column, out *jwriter.Writer, onlyHandleKeyColumns bool) error { if len(columns) == 0 { out.RawString("null") return nil @@ -68,6 +72,9 @@ func (c *JSONBatchEncoder) newJSONMessageForDML(e *model.RowChangedEvent) ([]byt isFirst := true for _, col := range columns { if col != nil { + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } if isFirst { isFirst = false } else { @@ -165,6 +172,9 @@ func (c *JSONBatchEncoder) newJSONMessageForDML(e *model.RowChangedEvent) ([]byt emptyColumn := true for _, col := range columns { if col != nil { + if isDelete && c.config.OnlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } if emptyColumn { out.RawByte('{') emptyColumn = false @@ -213,29 +223,29 @@ func (c *JSONBatchEncoder) newJSONMessageForDML(e *model.RowChangedEvent) ([]byt if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := filling(e.PreColumns, out); err != nil { + if err := filling(e.PreColumns, out, c.config.OnlyHandleKeyColumns); err != nil { return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := filling(e.Columns, out); err != nil { + if err := filling(e.Columns, out, false); err != nil { return nil, err } } else if e.IsUpdate() { out.RawString(",\"old\":") - if err := filling(e.PreColumns, out); err != nil { + if err := filling(e.PreColumns, out, false); err != nil { return nil, err } out.RawString(",\"data\":") - if err := filling(e.Columns, out); err != nil { + if err := filling(e.Columns, out, false); err != nil { return nil, err } } else { log.Panic("unreachable event type", zap.Any("event", e)) } - if c.enableTiDBExtension { + if c.config.EnableTiDBExtension { const prefix string = ",\"_tidb\":" out.RawString(prefix) out.RawByte('{') @@ -270,7 +280,7 @@ func (c *JSONBatchEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONMess Query: e.Query, } - if !c.enableTiDBExtension { + if !c.config.EnableTiDBExtension { return msg } @@ -295,7 +305,7 @@ func (c *JSONBatchEncoder) newJSONMessage4CheckpointEvent(ts uint64) *canalJSONM // EncodeCheckpointEvent implements the EventBatchEncoder interface func (c *JSONBatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { - if !c.enableTiDBExtension { + if !c.config.EnableTiDBExtension { return nil, nil } @@ -318,15 +328,15 @@ func (c *JSONBatchEncoder) AppendRowChangedEvent( if err != nil { return errors.Trace(err) } - if len(c.terminator) > 0 { - value = append(value, c.terminator...) + if len(c.config.Terminator) > 0 { + value = append(value, c.config.Terminator...) } length := len(value) + common.MaxRecordOverhead // for single message that is longer than max-message-bytes, do not send it. - if length > c.maxMessageBytes { + if length > c.config.MaxMessageBytes { log.Warn("Single message is too large for canal-json", - zap.Int("maxMessageBytes", c.maxMessageBytes), + zap.Int("maxMessageBytes", c.config.MaxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table)) return cerror.ErrMessageTooLarge.GenWithStackByArgs() diff --git a/cdc/sink/codec/canal/canal_json_encoder_test.go b/cdc/sink/codec/canal/canal_json_encoder_test.go index ba74694187b..90af3287849 100644 --- a/cdc/sink/codec/canal/canal_json_encoder_test.go +++ b/cdc/sink/codec/canal/canal_json_encoder_test.go @@ -33,13 +33,7 @@ func TestBuildJSONBatchEncoder(t *testing.T) { builder := &jsonBatchEncoderBuilder{config: cfg} encoder, ok := builder.Build().(*JSONBatchEncoder) require.True(t, ok) - require.False(t, encoder.enableTiDBExtension) - - cfg.EnableTiDBExtension = true - builder = &jsonBatchEncoderBuilder{config: cfg} - encoder, ok = builder.Build().(*JSONBatchEncoder) - require.True(t, ok) - require.True(t, encoder.enableTiDBExtension) + require.NotNil(t, encoder.config) } func TestNewCanalJSONMessage4DML(t *testing.T) { @@ -68,6 +62,12 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, "person", jsonMsg.Table) require.False(t, jsonMsg.IsDDL) + for _, col := range testCaseInsert.Columns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + // check data is enough obtainedDataMap := jsonMsg.getData() require.NotNil(t, obtainedDataMap) @@ -97,7 +97,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { } data, err = encoder.newJSONMessageForDML(testCaseUpdate) - require.Nil(t, err) + require.NoError(t, err) jsonMsg = &JSONMessage{} err = json.Unmarshal(data, jsonMsg) require.Nil(t, err) @@ -105,6 +105,15 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.NotNil(t, jsonMsg.Old) require.Equal(t, "UPDATE", jsonMsg.EventType) + for _, col := range testCaseUpdate.Columns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + for _, col := range testCaseUpdate.PreColumns { + require.Contains(t, jsonMsg.Old[0], col.Name) + } + data, err = encoder.newJSONMessageForDML(testCaseDelete) require.Nil(t, err) jsonMsg = &JSONMessage{} @@ -114,7 +123,47 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Nil(t, jsonMsg.Old) require.Equal(t, "DELETE", jsonMsg.EventType) - e = newJSONBatchEncoder(&common.Config{ + for _, col := range testCaseDelete.PreColumns { + require.Contains(t, jsonMsg.Data[0], col.Name) + } + + data, err = encoder.newJSONMessageForDML(testCaseDelete) + require.NoError(t, err) + jsonMsg = &JSONMessage{} + err = json.Unmarshal(data, jsonMsg) + require.NoError(t, err) + require.NotNil(t, jsonMsg.Data) + require.Nil(t, jsonMsg.Old) + + for _, col := range testCaseDelete.PreColumns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + + encoder, ok = newJSONBatchEncoder(&common.Config{OnlyHandleKeyColumns: true}).(*JSONBatchEncoder) + require.True(t, ok) + data, err = encoder.newJSONMessageForDML(testCaseDelete) + require.NoError(t, err) + jsonMsg = &JSONMessage{} + err = json.Unmarshal(data, jsonMsg) + require.NoError(t, err) + require.NotNil(t, jsonMsg.Data) + require.Nil(t, jsonMsg.Old) + + for _, col := range testCaseDelete.PreColumns { + if col.Flag.IsHandleKey() { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } else { + require.NotContains(t, jsonMsg.Data[0], col.Name) + require.NotContains(t, jsonMsg.SQLType, col.Name) + require.NotContains(t, jsonMsg.MySQLType, col.Name) + } + } + + e = newJSONRowEventEncoder(&common.Config{ EnableTiDBExtension: true, Terminator: "", }) @@ -135,8 +184,9 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { func TestNewCanalJSONMessageFromDDL(t *testing.T) { t.Parallel() - encoder := &JSONBatchEncoder{builder: newCanalEntryBuilder()} - require.NotNil(t, encoder) + + encoder, ok := newJSONRowEventEncoder(&common.Config{}).(*JSONBatchEncoder) + require.True(t, ok) message := encoder.newJSONMessageForDDL(testCaseDDL) require.NotNil(t, message) @@ -150,8 +200,10 @@ func TestNewCanalJSONMessageFromDDL(t *testing.T) { require.Equal(t, testCaseDDL.Query, msg.Query) require.Equal(t, "CREATE", msg.EventType) - encoder = &JSONBatchEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: true} - require.NotNil(t, encoder) + encoder, ok = newJSONRowEventEncoder(&common.Config{ + EnableTiDBExtension: true, + }).(*JSONBatchEncoder) + require.True(t, ok) message = encoder.newJSONMessageForDDL(testCaseDDL) require.NotNil(t, message) @@ -202,7 +254,10 @@ func TestEncodeCheckpointEvent(t *testing.T) { t.Parallel() var watermark uint64 = 2333 for _, enable := range []bool{false, true} { - encoder := &JSONBatchEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: enable} + config := &common.Config{ + EnableTiDBExtension: enable, + } + encoder := newJSONBatchEncoder(config).(*JSONBatchEncoder) require.NotNil(t, encoder) msg, err := encoder.EncodeCheckpointEvent(watermark) @@ -239,10 +294,7 @@ func TestEncodeCheckpointEvent(t *testing.T) { func TestCheckpointEventValueMarshal(t *testing.T) { t.Parallel() var watermark uint64 = 1024 - encoder := &JSONBatchEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: true, - } + encoder := newJSONBatchEncoder(&common.Config{EnableTiDBExtension: true}) require.NotNil(t, encoder) msg, err := encoder.EncodeCheckpointEvent(watermark) require.Nil(t, err) @@ -286,7 +338,10 @@ func TestCheckpointEventValueMarshal(t *testing.T) { func TestDDLEventWithExtensionValueMarshal(t *testing.T) { t.Parallel() - encoder := &JSONBatchEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: true} + encoder := &JSONBatchEncoder{ + builder: newCanalEntryBuilder(), + config: &common.Config{EnableTiDBExtension: true}, + } require.NotNil(t, encoder) message := encoder.newJSONMessageForDDL(testCaseDDL) diff --git a/cdc/sink/codec/canal/canal_test_util.go b/cdc/sink/codec/canal/canal_test_util.go index 368c334ff42..578653e0685 100644 --- a/cdc/sink/codec/canal/canal_test_util.go +++ b/cdc/sink/codec/canal/canal_test_util.go @@ -35,7 +35,7 @@ type testColumnTuple struct { var ( testColumnsTable = []*testColumnTuple{ { - &model.Column{Name: "tinyint", Type: mysql.TypeTiny, Value: int64(127)}, + &model.Column{Name: "tinyint", Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Type: mysql.TypeTiny, Value: int64(127)}, "tinyint", internal.JavaSQLTypeTINYINT, "127", "127", }, diff --git a/cdc/sink/codec/common/config.go b/cdc/sink/codec/common/config.go index cc3e0786d00..e5a1596ce0b 100644 --- a/cdc/sink/codec/common/config.go +++ b/cdc/sink/codec/common/config.go @@ -31,6 +31,8 @@ const defaultMaxBatchSize int = 16 type Config struct { Protocol config.Protocol + OnlyHandleKeyColumns bool + // control batch behavior, only for `open-protocol` and `craft` at the moment. MaxMessageBytes int MaxBatchSize int @@ -135,6 +137,8 @@ func (c *Config) Apply(sinkURI *url.URL, config *config.ReplicaConfig) error { } } + c.OnlyHandleKeyColumns = !config.EnableOldValue + return nil } diff --git a/cdc/sink/codec/common/config_test.go b/cdc/sink/codec/common/config_test.go index 1aac36e58d2..8b21cbde772 100644 --- a/cdc/sink/codec/common/config_test.go +++ b/cdc/sink/codec/common/config_test.go @@ -55,10 +55,16 @@ func TestConfigApplyValidate(t *testing.T) { err = c.Apply(sinkURI, replicaConfig) require.NoError(t, err) require.True(t, c.EnableTiDBExtension) + require.False(t, c.OnlyHandleKeyColumns) err = c.Validate() require.NoError(t, err) + replicaConfig.EnableOldValue = false + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + require.True(t, c.OnlyHandleKeyColumns) + uri = "kafka://127.0.0.1:9092/abc?protocol=canal-json&enable-tidb-extension=a" sinkURI, err = url.Parse(uri) require.NoError(t, err) diff --git a/cdc/sink/codec/craft/craft_encoder.go b/cdc/sink/codec/craft/craft_encoder.go index af950dffa0f..ce6e99c17cd 100644 --- a/cdc/sink/codec/craft/craft_encoder.go +++ b/cdc/sink/codec/craft/craft_encoder.go @@ -28,9 +28,7 @@ type BatchEncoder struct { messageBuf []*common.Message callbackBuf []func() - // configs - MaxMessageBytes int - MaxBatchSize int + config *common.Config allocator *SliceAllocator } @@ -49,11 +47,11 @@ func (e *BatchEncoder) AppendRowChangedEvent( ev *model.RowChangedEvent, callback func(), ) error { - rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) + rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev, e.config.OnlyHandleKeyColumns) if callback != nil { e.callbackBuf = append(e.callbackBuf, callback) } - if size > e.MaxMessageBytes || rows >= e.MaxBatchSize { + if size > e.config.MaxMessageBytes || rows >= e.config.MaxBatchSize { e.flush() } return nil @@ -98,11 +96,11 @@ func (e *BatchEncoder) flush() { } // NewBatchEncoder creates a new BatchEncoder. -func NewBatchEncoder() codec.EventBatchEncoder { +func NewBatchEncoder(config *common.Config) codec.EventBatchEncoder { // 64 is a magic number that come up with these assumptions and manual benchmark. // 1. Most table will not have more than 64 columns // 2. It only worth allocating slices in batch for slices that's small enough - return NewBatchEncoderWithAllocator(NewSliceAllocator(64)) + return NewBatchEncoderWithAllocator(NewSliceAllocator(64), config) } type batchEncoderBuilder struct { @@ -111,10 +109,7 @@ type batchEncoderBuilder struct { // Build a BatchEncoder func (b *batchEncoderBuilder) Build() codec.EventBatchEncoder { - encoder := NewBatchEncoder() - encoder.(*BatchEncoder).MaxMessageBytes = b.config.MaxMessageBytes - encoder.(*BatchEncoder).MaxBatchSize = b.config.MaxBatchSize - return encoder + return NewBatchEncoder(b.config) } // NewBatchEncoderBuilder creates a craft batchEncoderBuilder. @@ -123,11 +118,12 @@ func NewBatchEncoderBuilder(config *common.Config) codec.EncoderBuilder { } // NewBatchEncoderWithAllocator creates a new BatchEncoder with given allocator. -func NewBatchEncoderWithAllocator(allocator *SliceAllocator) codec.EventBatchEncoder { +func NewBatchEncoderWithAllocator(allocator *SliceAllocator, config *common.Config) codec.EventBatchEncoder { return &BatchEncoder{ allocator: allocator, messageBuf: make([]*common.Message, 0, 2), callbackBuf: make([]func(), 0), rowChangedBuffer: NewRowChangedEventBuffer(allocator), + config: config, } } diff --git a/cdc/sink/codec/craft/craft_encoder_test.go b/cdc/sink/codec/craft/craft_encoder_test.go index 1da596de907..626527d1a19 100644 --- a/cdc/sink/codec/craft/craft_encoder_test.go +++ b/cdc/sink/codec/craft/craft_encoder_test.go @@ -104,8 +104,7 @@ func TestBuildCraftBatchEncoder(t *testing.T) { builder := &batchEncoderBuilder{config: cfg} encoder, ok := builder.Build().(*BatchEncoder) require.True(t, ok) - require.Equal(t, cfg.MaxBatchSize, encoder.MaxBatchSize) - require.Equal(t, cfg.MaxMessageBytes, encoder.MaxMessageBytes) + require.NotNil(t, encoder.config) } func testBatchCodec( diff --git a/cdc/sink/codec/craft/model.go b/cdc/sink/codec/craft/model.go index 12943b77658..71af0e88f14 100644 --- a/cdc/sink/codec/craft/model.go +++ b/cdc/sink/codec/craft/model.go @@ -366,7 +366,7 @@ func decodeColumnGroup(bits []byte, allocator *SliceAllocator, dict *termDiction }, nil } -func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column) (int, *columnGroup) { +func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column, onlyHandleKeyColumns bool) (int, *columnGroup) { l := len(columns) if l == 0 { return 0, nil @@ -381,6 +381,9 @@ func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column) if col == nil { continue } + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } names[idx] = col.Name types[idx] = uint64(col.Type) flags[idx] = uint64(col.Flag) @@ -404,7 +407,7 @@ func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column) // Row changed message is basically an array of column groups type rowChangedEvent = []*columnGroup -func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent) (int, rowChangedEvent) { +func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent, onlyHandleKeyColumns bool) (int, rowChangedEvent) { numGroups := 0 if ev.PreColumns != nil { numGroups++ @@ -415,12 +418,13 @@ func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent) groups := allocator.columnGroupSlice(numGroups) estimatedSize := 0 idx := 0 - if size, group := newColumnGroup(allocator, columnGroupTypeNew, ev.Columns); group != nil { + if size, group := newColumnGroup(allocator, columnGroupTypeNew, ev.Columns, false); group != nil { groups[idx] = group idx++ estimatedSize += size } - if size, group := newColumnGroup(allocator, columnGroupTypeOld, ev.PreColumns); group != nil { + onlyHandleKeyColumns = onlyHandleKeyColumns && ev.IsDelete() + if size, group := newColumnGroup(allocator, columnGroupTypeOld, ev.PreColumns, onlyHandleKeyColumns); group != nil { groups[idx] = group estimatedSize += size } @@ -454,7 +458,7 @@ func (b *RowChangedEventBuffer) Encode() []byte { } // AppendRowChangedEvent append a new event to buffer -func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent) (rows, size int) { +func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent, onlyHandleKeyColumns bool) (rows, size int) { var partition int64 = -1 if ev.Table.IsPartition { partition = ev.Table.TableID @@ -479,7 +483,7 @@ func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent) if b.eventsCount+1 > len(b.events) { b.events = b.allocator.resizeRowChangedEventSlice(b.events, newBufferSize(b.eventsCount)) } - size, message := newRowChangedMessage(b.allocator, ev) + size, message := newRowChangedMessage(b.allocator, ev, onlyHandleKeyColumns) b.events[b.eventsCount] = message b.eventsCount++ b.estimatedSize += size diff --git a/cdc/sink/codec/maxwell/maxwell_encoder.go b/cdc/sink/codec/maxwell/maxwell_encoder.go index a23379e4132..05562867ec8 100644 --- a/cdc/sink/codec/maxwell/maxwell_encoder.go +++ b/cdc/sink/codec/maxwell/maxwell_encoder.go @@ -31,6 +31,8 @@ type BatchEncoder struct { valueBuf *bytes.Buffer callbackBuf []func() batchSize int + + config *common.Config } // EncodeCheckpointEvent implements the EventBatchEncoder interface @@ -47,7 +49,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - _, valueMsg := rowChangeToMaxwellMsg(e) + _, valueMsg := rowChangeToMaxwellMsg(e, d.config.OnlyHandleKeyColumns) value, err := valueMsg.encode() if err != nil { return errors.Trace(err) @@ -109,24 +111,29 @@ func (d *BatchEncoder) reset() { } // newBatchEncoder creates a new maxwell BatchEncoder. -func newBatchEncoder() codec.EventBatchEncoder { +func newBatchEncoder(config *common.Config) codec.EventBatchEncoder { batch := &BatchEncoder{ keyBuf: &bytes.Buffer{}, valueBuf: &bytes.Buffer{}, callbackBuf: make([]func(), 0), + config: config, } batch.reset() return batch } -type batchEncoderBuilder struct{} +type batchEncoderBuilder struct { + config *common.Config +} // NewBatchEncoderBuilder creates a maxwell batchEncoderBuilder. -func NewBatchEncoderBuilder() codec.EncoderBuilder { - return &batchEncoderBuilder{} +func NewBatchEncoderBuilder(config *common.Config) codec.EncoderBuilder { + return &batchEncoderBuilder{ + config: config, + } } // Build a `maxwellBatchEncoder` func (b *batchEncoderBuilder) Build() codec.EventBatchEncoder { - return newBatchEncoder() + return newBatchEncoder(b.config) } diff --git a/cdc/sink/codec/maxwell/maxwell_encoder_test.go b/cdc/sink/codec/maxwell/maxwell_encoder_test.go index 4220db9a15b..ff03fc1f711 100644 --- a/cdc/sink/codec/maxwell/maxwell_encoder_test.go +++ b/cdc/sink/codec/maxwell/maxwell_encoder_test.go @@ -20,6 +20,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/codec/common" "github.com/stretchr/testify/require" ) @@ -33,7 +34,7 @@ func TestMaxwellBatchCodec(t *testing.T) { Columns: []*model.Column{{Name: "col1", Type: 3, Value: 10}}, }}, {}} for _, cs := range rowCases { - encoder := newEncoder() + encoder := newEncoder(&common.Config{}) for _, row := range cs { err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) @@ -59,7 +60,7 @@ func TestMaxwellBatchCodec(t *testing.T) { Type: 1, }}} for _, cs := range ddlCases { - encoder := newEncoder() + encoder := newEncoder(&common.Config{}) for _, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) require.Nil(t, err) @@ -69,7 +70,7 @@ func TestMaxwellBatchCodec(t *testing.T) { } func TestMaxwellAppendRowChangedEventWithCallback(t *testing.T) { - encoder := newBatchEncoder() + encoder := newBatchEncoder(&common.Config{}) require.NotNil(t, encoder) count := 0 diff --git a/cdc/sink/codec/maxwell/maxwell_message.go b/cdc/sink/codec/maxwell/maxwell_message.go index 0ef5836f655..db3b9121d1e 100644 --- a/cdc/sink/codec/maxwell/maxwell_message.go +++ b/cdc/sink/codec/maxwell/maxwell_message.go @@ -43,7 +43,7 @@ func (m *maxwellMessage) encode() ([]byte, error) { return data, cerror.WrapError(cerror.ErrMaxwellEncodeFailed, err) } -func rowChangeToMaxwellMsg(e *model.RowChangedEvent) (*internal.MessageKey, *maxwellMessage) { +func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*internal.MessageKey, *maxwellMessage) { var partition *int64 if e.Table.IsPartition { partition = &e.Table.TableID @@ -68,6 +68,9 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent) (*internal.MessageKey, *max if e.IsDelete() { value.Type = "delete" for _, v := range e.PreColumns { + if onlyHandleKeyColumns && !v.Flag.IsHandleKey() { + continue + } switch v.Type { case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: if v.Value == nil { diff --git a/cdc/sink/codec/maxwell/maxwell_message_test.go b/cdc/sink/codec/maxwell/maxwell_message_test.go index 852886cc444..3928d3f36c4 100644 --- a/cdc/sink/codec/maxwell/maxwell_message_test.go +++ b/cdc/sink/codec/maxwell/maxwell_message_test.go @@ -54,7 +54,7 @@ func TestEncodeBinaryToMaxwell(t *testing.T) { Columns: []*model.Column{column}, } - key, msg := rowChangeToMaxwellMsg(e) + key, msg := rowChangeToMaxwellMsg(e, false) require.NotNil(t, key) require.NotNil(t, msg) } diff --git a/cdc/sink/codec/open/open_protocol_encoder.go b/cdc/sink/codec/open/open_protocol_encoder.go index 2ace0a0ef89..dac5b05b499 100644 --- a/cdc/sink/codec/open/open_protocol_encoder.go +++ b/cdc/sink/codec/open/open_protocol_encoder.go @@ -34,9 +34,7 @@ type BatchEncoder struct { callbackBuff []func() curBatchSize int - // configs - MaxMessageBytes int - MaxBatchSize int + config *common.Config } // AppendRowChangedEvent implements the EventBatchEncoder interface @@ -46,7 +44,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - keyMsg, valueMsg := rowChangeToMsg(e) + keyMsg, valueMsg := rowChangeToMsg(e, d.config.OnlyHandleKeyColumns) key, err := keyMsg.Encode() if err != nil { return errors.Trace(err) @@ -64,9 +62,9 @@ func (d *BatchEncoder) AppendRowChangedEvent( // for single message that is longer than max-message-bytes, do not send it. // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` length := len(key) + len(value) + common.MaxRecordOverhead + 16 + 8 - if length > d.MaxMessageBytes { + if length > d.config.MaxMessageBytes { log.Warn("Single message is too large for open-protocol", - zap.Int("maxMessageBytes", d.MaxMessageBytes), + zap.Int("maxMessageBytes", d.config.MaxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table), zap.Any("key", key)) @@ -74,8 +72,8 @@ func (d *BatchEncoder) AppendRowChangedEvent( } if len(d.messageBuf) == 0 || - d.curBatchSize >= d.MaxBatchSize || - d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.MaxMessageBytes { + d.curBatchSize >= d.config.MaxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.config.MaxMessageBytes { // Before we create a new message, we should handle the previous callbacks. d.tryBuildCallback() versionHead := make([]byte, 8) @@ -190,11 +188,7 @@ type batchEncoderBuilder struct { // Build a BatchEncoder func (b *batchEncoderBuilder) Build() codec.EventBatchEncoder { - encoder := NewBatchEncoder() - encoder.(*BatchEncoder).MaxMessageBytes = b.config.MaxMessageBytes - encoder.(*BatchEncoder).MaxBatchSize = b.config.MaxBatchSize - - return encoder + return NewBatchEncoder(b.config) } // NewBatchEncoderBuilder creates an open-protocol batchEncoderBuilder. @@ -203,7 +197,8 @@ func NewBatchEncoderBuilder(config *common.Config) codec.EncoderBuilder { } // NewBatchEncoder creates a new BatchEncoder. -func NewBatchEncoder() codec.EventBatchEncoder { - batch := &BatchEncoder{} - return batch +func NewBatchEncoder(config *common.Config) codec.EventBatchEncoder { + return &BatchEncoder{ + config: config, + } } diff --git a/cdc/sink/codec/open/open_protocol_encoder_test.go b/cdc/sink/codec/open/open_protocol_encoder_test.go index 49ddd6b6dd6..80c8305d65f 100644 --- a/cdc/sink/codec/open/open_protocol_encoder_test.go +++ b/cdc/sink/codec/open/open_protocol_encoder_test.go @@ -31,8 +31,7 @@ func TestBuildOpenProtocolBatchEncoder(t *testing.T) { builder := &batchEncoderBuilder{config: config} encoder, ok := builder.Build().(*BatchEncoder) require.True(t, ok) - require.Equal(t, config.MaxBatchSize, encoder.MaxBatchSize) - require.Equal(t, config.MaxMessageBytes, encoder.MaxMessageBytes) + require.NotNil(t, encoder.config) } func TestMaxMessageBytes(t *testing.T) { @@ -131,7 +130,6 @@ func TestOpenProtocolAppendRowChangedEventWithCallback(t *testing.T) { builder := &batchEncoderBuilder{config: cfg} encoder, ok := builder.Build().(*BatchEncoder) require.True(t, ok) - require.Equal(t, cfg.MaxBatchSize, encoder.MaxBatchSize) count := 0 diff --git a/cdc/sink/codec/open/open_protocol_message.go b/cdc/sink/codec/open/open_protocol_message.go index 4c06169fd9f..4032eb3306e 100644 --- a/cdc/sink/codec/open/open_protocol_message.go +++ b/cdc/sink/codec/open/open_protocol_message.go @@ -76,7 +76,7 @@ func newResolvedMessage(ts uint64) *internal.MessageKey { } } -func rowChangeToMsg(e *model.RowChangedEvent) (*internal.MessageKey, *messageRow) { +func rowChangeToMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*internal.MessageKey, *messageRow) { var partition *int64 if e.Table.IsPartition { partition = &e.Table.TableID @@ -91,10 +91,10 @@ func rowChangeToMsg(e *model.RowChangedEvent) (*internal.MessageKey, *messageRow } value := &messageRow{} if e.IsDelete() { - value.Delete = rowChangeColumns2CodecColumns(e.PreColumns) + value.Delete = rowChangeColumns2CodecColumns(e.PreColumns, onlyHandleKeyColumns) } else { - value.Update = rowChangeColumns2CodecColumns(e.Columns) - value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, false) + value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, false) } return key, value } @@ -123,12 +123,15 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang return e } -func rowChangeColumns2CodecColumns(cols []*model.Column) map[string]internal.Column { +func rowChangeColumns2CodecColumns(cols []*model.Column, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { if col == nil { continue } + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } c := internal.Column{} c.FromRowChangeColumn(col) jsonCols[col.Name] = c diff --git a/cdc/sink/codec/open/open_protocol_message_test.go b/cdc/sink/codec/open/open_protocol_message_test.go index 826eda172f7..6b9b73bc20a 100644 --- a/cdc/sink/codec/open/open_protocol_message_test.go +++ b/cdc/sink/codec/open/open_protocol_message_test.go @@ -90,3 +90,60 @@ func TestVarBinaryCol(t *testing.T) { col2 := mqCol2.ToRowChangeColumn("test") require.Equal(t, col, col2) } + +func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { + t.Parallel() + + insertEvent := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "schema", + Table: "table", + }, + Columns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 1}, + }, + } + _, value := rowChangeToMsg(insertEvent, true) + _, ok := value.Update["a"] + require.True(t, ok) + + updateEvent := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "schema", + Table: "table", + }, + Columns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 2}, + }, + PreColumns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 1}, + }, + } + _, value = rowChangeToMsg(updateEvent, true) + _, ok = value.PreColumns["a"] + require.True(t, ok) + + deleteEvent := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "schema", + Table: "table", + }, + PreColumns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 2}, + }, + } + _, value = rowChangeToMsg(deleteEvent, true) + _, ok = value.Delete["a"] + require.False(t, ok) + + _, value = rowChangeToMsg(deleteEvent, false) + _, ok = value.Delete["a"] + require.True(t, ok) +} diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index 422b33bf6d1..95713e7550c 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -94,8 +94,6 @@ func TestKafkaSink(t *testing.T) { encoder := sink.encoderBuilder.Build() require.IsType(t, &open.BatchEncoder{}, encoder) - require.Equal(t, 1, encoder.(*open.BatchEncoder).MaxBatchSize) - require.Equal(t, 1048576, encoder.(*open.BatchEncoder).MaxMessageBytes) // mock kafka broker processes 1 row changed event tableID := model.TableID(1) diff --git a/errors.toml b/errors.toml index cc3785c1ba3..89125e14737 100755 --- a/errors.toml +++ b/errors.toml @@ -431,6 +431,11 @@ error = ''' illegal parameter for sorter: %s ''' +["CDC:ErrIncompatibleConfig"] +error = ''' +incompatible configuration +''' + ["CDC:ErrIncompatibleSinkConfig"] error = ''' incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index a437fab1a5f..a6aaac87325 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" @@ -151,28 +150,14 @@ func (o *createChangefeedOptions) completeReplicaCfg( } } - if !cfg.EnableOldValue { - sinkURIParsed, err := url.Parse(o.commonChangefeedOptions.sinkURI) - if err != nil { - return cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - - protocol := sinkURIParsed.Query().Get(config.ProtocolKey) - if protocol != "" { - cfg.Sink.Protocol = protocol - } - for _, fp := range config.ForceEnableOldValueProtocols { - if cfg.Sink.Protocol == fp { - log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) - cfg.EnableOldValue = true - break - } - } + uri, err := url.Parse(o.commonChangefeedOptions.sinkURI) + if err != nil { + return err + } - if cfg.ForceReplicate { - log.Error("if use force replicate, old value feature must be enabled") - return cerror.ErrOldValueNotEnabled.GenWithStackByArgs() - } + err = cfg.AdjustEnableOldValueAndVerifyForceReplicate(uri) + if err != nil { + return err } for _, rules := range cfg.Sink.DispatchRules { diff --git a/pkg/cmd/cli/cli_changefeed_create_test.go b/pkg/cmd/cli/cli_changefeed_create_test.go index 0c7191bde49..2664bc08db1 100644 --- a/pkg/cmd/cli/cli_changefeed_create_test.go +++ b/pkg/cmd/cli/cli_changefeed_create_test.go @@ -24,6 +24,7 @@ import ( v2 "github.com/pingcap/tiflow/cdc/api/v2" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/spf13/cobra" "github.com/stretchr/testify/require" ) @@ -174,3 +175,38 @@ func TestChangefeedCreateCli(t *testing.T) { require.NoError(t, o.complete(f, cmd)) require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`") } + +func TestChangefeedCreateCliAdjustEnableOldValue(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + f := newMockFactory(ctrl) + + // enable old value, but use avro as the encoding protocol, should be set to false. + dir := t.TempDir() + configPath := filepath.Join(dir, "adjust-old-value.toml") + err := os.WriteFile(configPath, []byte("enable-old-value=true"), 0o644) + require.NoError(t, err) + + cmd := new(cobra.Command) + o := newChangefeedCommonOptions() + o.addFlags(cmd) + + require.NoError(t, cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", configPath)})) + require.NoError(t, cmd.ParseFlags([]string{"--sink-uri=kafka://127.0.0.1:9092/test?protocol=avro"})) + + opt := newCreateChangefeedOptions(o) + require.NoError(t, opt.complete(f, cmd)) + require.False(t, opt.cfg.EnableOldValue) + + // also enable the force replicate, should return error + configPath = filepath.Join(dir, "enable-old-value-force-replicate.toml") + err = os.WriteFile(configPath, []byte("enable-old-value=true\r\nforce-replicate = true"), 0o644) + require.NoError(t, err) + + require.NoError(t, cmd.ParseFlags([]string{"--sink-uri=kafka://127.0.0.1:9092/test?protocol=avro"})) + require.NoError(t, cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", configPath)})) + + opt = newCreateChangefeedOptions(o) + err = opt.complete(f, cmd) + require.Error(t, cerror.ErrOldValueNotEnabled, err) +} diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index d96ebe9b278..44adace0e5d 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -183,7 +183,11 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { require.Equal(t, &config.MounterConfig{ WorkerNum: 16, }, cfg.Mounter) - err = cfg.ValidateAndAdjust(nil) + + sinkURL, err := url.Parse("kafka://127.0.0.1:9092") + require.NoError(t, err) + + err = cfg.ValidateAndAdjust(sinkURL) require.Nil(t, err) require.Equal(t, &config.SinkConfig{ EncoderConcurrency: 16, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index bc65ce26496..e1741eb0d4d 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "net/url" + "strings" "time" "github.com/pingcap/errors" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/tiflow/pkg/config/outdated" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/sink" "go.uber.org/zap" ) @@ -164,14 +166,19 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } // ValidateAndAdjust verifies and adjusts the replica configuration. -func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { - // check sink uri +func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri if c.Sink != nil { - err := c.Sink.validateAndAdjust(sinkURI, c.EnableOldValue) + err := c.Sink.validateAndAdjust(sinkURI) + if err != nil { + return err + } + + err = c.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) if err != nil { return err } } + if c.Consistent != nil { err := c.Consistent.ValidateAndAdjust() if err != nil { @@ -224,3 +231,49 @@ func GetSinkURIAndAdjustConfigWithSinkURI( return sinkURI, nil } + +// AdjustEnableOldValue adjust the old value configuration by the sink scheme and encoding protocol +func (c *ReplicaConfig) AdjustEnableOldValue(scheme, protocol string) { + if sink.IsMySQLCompatibleScheme(scheme) { + return + } + + if c.EnableOldValue { + _, ok := ForceDisableOldValueProtocols[protocol] + if ok { + log.Warn("Attempting to replicate with old value enabled, but the specified protocol must disable old value. "+ + "CDC will disable old value and continue.", zap.String("protocol", protocol)) + c.EnableOldValue = false + } + return + } + + _, ok := ForceEnableOldValueProtocols[protocol] + if ok { + log.Warn("Attempting to replicate with old value disabled, but the specified protocol must enable old value. "+ + "CDC will enable old value and continue.", zap.String("protocol", protocol)) + c.EnableOldValue = true + } +} + +// AdjustEnableOldValueAndVerifyForceReplicate adjust the old value configuration by the sink scheme and encoding protocol +// and then verify the force replicate. +func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url.URL) error { + scheme := strings.ToLower(sinkURI.Scheme) + protocol := sinkURI.Query().Get(ProtocolKey) + c.AdjustEnableOldValue(scheme, protocol) + + if !c.ForceReplicate { + return nil + } + + // MySQL Sink require the old value feature must be enabled to allow delete event send to downstream. + if sink.IsMySQLCompatibleScheme(scheme) { + if !c.EnableOldValue { + log.Error("force replicate, old value feature is disabled for the changefeed using mysql sink") + return cerror.ErrIncompatibleConfig.GenWithStackByArgs() + } + } + + return nil +} diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index d5d584e4727..e48872493c3 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -16,9 +16,11 @@ package config import ( "bytes" "encoding/json" + "net/url" "testing" "time" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) @@ -101,21 +103,26 @@ func TestReplicaConfigOutDated(t *testing.T) { func TestReplicaConfigValidate(t *testing.T) { t.Parallel() conf := GetDefaultReplicaConfig() - require.Nil(t, conf.ValidateAndAdjust(nil)) + + sinkURL, err := url.Parse("blackhole://xxx?protocol=canal") + require.NoError(t, err) + require.NoError(t, conf.ValidateAndAdjust(sinkURL)) // Incorrect sink configuration. conf = GetDefaultReplicaConfig() conf.Sink.Protocol = "canal" conf.EnableOldValue = false - require.Regexp(t, ".*canal protocol requires old value to be enabled.*", - conf.ValidateAndAdjust(nil)) + + err = conf.ValidateAndAdjust(sinkURL) + require.NoError(t, err) + require.True(t, conf.EnableOldValue) conf = GetDefaultReplicaConfig() conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, } - require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", - conf.ValidateAndAdjust(nil)) + err = conf.ValidateAndAdjust(sinkURL) + require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", err) // Correct sink configuration. conf = GetDefaultReplicaConfig() @@ -124,7 +131,7 @@ func TestReplicaConfigValidate(t *testing.T) { {Matcher: []string{"a.c"}, PartitionRule: "p1"}, {Matcher: []string{"a.d"}}, } - err := conf.ValidateAndAdjust(nil) + err = conf.ValidateAndAdjust(sinkURL) require.Nil(t, err) rules := conf.Sink.DispatchRules require.Equal(t, "d1", rules[0].PartitionRule) @@ -134,12 +141,12 @@ func TestReplicaConfigValidate(t *testing.T) { // Test memory quota can be adjusted conf = GetDefaultReplicaConfig() conf.MemoryQuota = 0 - err = conf.ValidateAndAdjust(nil) + err = conf.ValidateAndAdjust(sinkURL) require.NoError(t, err) require.Equal(t, uint64(DefaultChangefeedMemoryQuota), conf.MemoryQuota) conf.MemoryQuota = uint64(1024) - err = conf.ValidateAndAdjust(nil) + err = conf.ValidateAndAdjust(sinkURL) require.NoError(t, err) require.Equal(t, uint64(1024), conf.MemoryQuota) } @@ -147,18 +154,94 @@ func TestReplicaConfigValidate(t *testing.T) { func TestValidateAndAdjust(t *testing.T) { cfg := GetDefaultReplicaConfig() require.False(t, cfg.EnableSyncPoint) - require.NoError(t, cfg.ValidateAndAdjust(nil)) + + sinkURL, err := url.Parse("blackhole://") + require.NoError(t, err) + + require.NoError(t, cfg.ValidateAndAdjust(sinkURL)) cfg.EnableSyncPoint = true - require.NoError(t, cfg.ValidateAndAdjust(nil)) + require.NoError(t, cfg.ValidateAndAdjust(sinkURL)) cfg.SyncPointInterval = time.Second * 29 - require.Error(t, cfg.ValidateAndAdjust(nil)) + require.Error(t, cfg.ValidateAndAdjust(sinkURL)) cfg.SyncPointInterval = time.Second * 30 cfg.SyncPointRetention = time.Minute * 10 - require.Error(t, cfg.ValidateAndAdjust(nil)) + require.Error(t, cfg.ValidateAndAdjust(sinkURL)) cfg.Sink.EncoderConcurrency = -1 - require.Error(t, cfg.ValidateAndAdjust(nil)) + require.Error(t, cfg.ValidateAndAdjust(sinkURL)) +} + +func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { + t.Parallel() + + config := GetDefaultReplicaConfig() + config.EnableOldValue = false + + // mysql sink, do not adjust enable-old-value + sinkURI, err := url.Parse("mysql://") + require.NoError(t, err) + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // mysql sink, `enable-old-value` false, `force-replicate` true, should return error + config.ForceReplicate = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.Error(t, cerror.ErrOldValueNotEnabled, err) + + // canal, `enable-old-value` false, `force-replicate` false, no error, `enable-old-value` adjust to true + config.ForceReplicate = false + config.EnableOldValue = false + // canal require old value enabled + sinkURI, err = url.Parse("kafka://127.0.0.1:9092/test?protocol=canal") + require.NoError(t, err) + + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.True(t, config.EnableOldValue) + + // canal, `force-replicate` true, `enable-old-value` true, no error + config.ForceReplicate = true + config.EnableOldValue = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.True(t, config.ForceReplicate) + require.True(t, config.EnableOldValue) + + // avro, `enable-old-value` false, `force-replicate` false, no error + config.ForceReplicate = false + config.EnableOldValue = false + sinkURI, err = url.Parse("kafka://127.0.0.1:9092/test?protocol=avro") + require.NoError(t, err) + + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // avro, `enable-old-value` true, no error, set to false. no matter `force-replicate` + config.EnableOldValue = true + config.ForceReplicate = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // csv, `enable-old-value` false, `force-replicate` false, no error + config.EnableOldValue = false + config.ForceReplicate = false + sinkURI, err = url.Parse("s3://xxx/yyy?protocol=csv") + require.NoError(t, err) + + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // csv, `enable-old-value` true, no error, set to false. no matter `force-replicate` + config.EnableOldValue = true + config.ForceReplicate = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 93a164026af..7fbde76f538 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -94,10 +94,16 @@ func (l AtomicityLevel) validate(scheme string) error { } // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. -var ForceEnableOldValueProtocols = []string{ - ProtocolCanal.String(), - ProtocolCanalJSON.String(), - ProtocolMaxwell.String(), +var ForceEnableOldValueProtocols = map[string]struct{}{ + ProtocolCanal.String(): {}, + ProtocolCanalJSON.String(): {}, + ProtocolMaxwell.String(): {}, +} + +// ForceDisableOldValueProtocols specifies protocols need to be forced to disable old value. +var ForceDisableOldValueProtocols = map[string]struct{}{ + ProtocolAvro.String(): {}, + ProtocolCsv.String(): {}, } // SinkConfig represents sink config for a changefeed @@ -243,21 +249,11 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } -func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error { +func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err } - if !enableOldValue { - for _, protocolStr := range ForceEnableOldValueProtocols { - if protocolStr == s.Protocol { - log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ - "Please update changefeed config", s.Protocol)) - return cerror.WrapError(cerror.ErrKafkaInvalidConfig, - errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", s.Protocol))) - } - } - } for _, rule := range s.DispatchRules { if rule.DispatcherRule != "" && rule.PartitionRule != "" { log.Error("dispatcher and partition cannot be configured both", zap.Any("rule", rule)) diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index af7051e74bc..27477a6a8d2 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -20,67 +20,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestValidateOldValue(t *testing.T) { - t.Parallel() - testCases := []struct { - protocol string - enableOldValue bool - expectedErr string - }{ - { - protocol: "default", - enableOldValue: false, - expectedErr: "", - }, - { - protocol: "default", - enableOldValue: true, - expectedErr: "", - }, - { - protocol: "canal-json", - enableOldValue: false, - expectedErr: ".*canal-json protocol requires old value to be enabled.*", - }, - { - protocol: "canal-json", - enableOldValue: true, - expectedErr: "", - }, - { - protocol: "canal", - enableOldValue: false, - expectedErr: ".*canal protocol requires old value to be enabled.*", - }, - { - protocol: "canal", - enableOldValue: true, - expectedErr: "", - }, - { - protocol: "maxwell", - enableOldValue: false, - expectedErr: ".*maxwell protocol requires old value to be enabled.*", - }, - { - protocol: "maxwell", - enableOldValue: true, - expectedErr: "", - }, - } - - for _, tc := range testCases { - cfg := SinkConfig{ - Protocol: tc.protocol, - } - if tc.expectedErr == "" { - require.Nil(t, cfg.validateAndAdjust(nil, tc.enableOldValue)) - } else { - require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(nil, tc.enableOldValue)) - } - } -} - func TestValidateTxnAtomicity(t *testing.T) { t.Parallel() testCases := []struct { @@ -156,10 +95,10 @@ func TestValidateTxnAtomicity(t *testing.T) { parsedSinkURI, err := url.Parse(tc.sinkURI) require.Nil(t, err) if tc.expectedErr == "" { - require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Nil(t, cfg.validateAndAdjust(parsedSinkURI)) require.Equal(t, tc.shouldSplitTxn, cfg.TxnAtomicity.ShouldSplitTxn()) } else { - require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI)) } } } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 2fc0d287493..09cab2eae4b 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -400,6 +400,10 @@ var ( "old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled"), ) + ErrIncompatibleConfig = errors.Normalize( + "incompatible configuration", + errors.RFCCodeText("CDC:ErrIncompatibleConfig"), + ) ErrSinkInvalidConfig = errors.Normalize( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index 9affae5be61..18fb88c353c 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -229,9 +229,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er return errors.Trace(err) } if key.Tp == etcd.CDCKeyTypeChangefeedInfo { - if err := s.Info.VerifyAndComplete(); err != nil { - return errors.Trace(err) - } + s.Info.VerifyAndComplete() } return nil } diff --git a/pkg/version/creator_version_gate.go b/pkg/version/creator_version_gate.go index a11ce3c3eac..fc1324d1b05 100644 --- a/pkg/version/creator_version_gate.go +++ b/pkg/version/creator_version_gate.go @@ -78,7 +78,10 @@ func (g *CreatorVersionGate) ChangefeedAcceptUnknownProtocols() bool { return creatorVersion.LessThan(changefeedAcceptUnknownProtocolsVersion) } -var changefeedAcceptProtocolInMysqlSinURI = *semver.New("6.1.1") +var ( + changefeedAcceptProtocolInMysqlSinURI = *semver.New("6.1.1") + changefeedAdjustEnableOldValueByProtocol = *semver.New("7.2.0") +) // ChangefeedAcceptProtocolInMysqlSinURI determines whether to accept // protocol in mysql sink uri or configure based on the creator's version. @@ -92,3 +95,14 @@ func (g *CreatorVersionGate) ChangefeedAcceptProtocolInMysqlSinURI() bool { creatorVersion := semver.New(SanitizeVersion(g.version)) return creatorVersion.LessThan(changefeedAcceptProtocolInMysqlSinURI) } + +// ChangefeedAdjustEnableOldValueByProtocol determines whether to adjust +// the `enable-old-value` configuration by the using encoding protocol. +func (g *CreatorVersionGate) ChangefeedAdjustEnableOldValueByProtocol() bool { + if g.version == "" { + return true + } + + creatorVersion := semver.New(SanitizeVersion(g.version)) + return creatorVersion.LessThan(changefeedAdjustEnableOldValueByProtocol) +} diff --git a/tests/integration_tests/bank/case.go b/tests/integration_tests/bank/case.go index a88597330dc..c10cdabc19a 100644 --- a/tests/integration_tests/bank/case.go +++ b/tests/integration_tests/bank/case.go @@ -133,7 +133,9 @@ func (s *sequenceTest) prepare(ctx context.Context, db *sql.DB, accounts, tableI for j := 0; j < batchSize; j++ { args[j] = fmt.Sprintf("(%d, 0, 0, 0)", offset+j) } - return fmt.Sprintf("INSERT IGNORE INTO accounts_seq%d (id, counter, sequence, startts) VALUES %s", tableID, strings.Join(args, ",")) + sql := fmt.Sprintf("INSERT IGNORE INTO accounts_seq%d (id, counter, sequence, startts) VALUES %s", tableID, strings.Join(args, ",")) + log.Info("batch insert sql", zap.String("sql", sql)) + return sql } prepareImpl(ctx, s, createTable, batchInsertSQLF, db, accounts, tableID, concurrency) @@ -187,7 +189,7 @@ func (*sequenceTest) verify(ctx context.Context, db *sql.DB, accounts, tableID i } return nil - }, retry.WithBackoffMaxDelay(500), retry.WithBackoffMaxDelay(120*1000), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffMaxDelay(500), retry.WithBackoffMaxDelay(120*1000), retry.WithMaxTries(20), retry.WithIsRetryableErr(cerror.IsRetryableError)) } // tryDropDB will drop table if data incorrect and panic error likes bad connect. diff --git a/tests/integration_tests/multi_changefeed/run.sh b/tests/integration_tests/multi_changefeed/run.sh index f4522be3d63..72b60fff2b8 100755 --- a/tests/integration_tests/multi_changefeed/run.sh +++ b/tests/integration_tests/multi_changefeed/run.sh @@ -38,17 +38,8 @@ function check_old_value_enabled() { # When old value is turned on, the pre-column in our delete will include all the columns. # So here we have 1 (id) and 3 (val). delete_with_old_value_count=$(grep "BlackHoleSink: WriteEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep 'columns\\\":null' | grep 'value\\\":1' | grep -c 'value\\\":3') - if [[ "$delete_with_old_value_count" -ne 1 ]]; then - echo "can't found delete row with old value" - exit 1 - fi - - # check if exist a delete row without a complete `pre-column` - # When old value is turned off, the pre-column in our delete will only include the handle columns. - # So here we only have 1 (id). - delete_without_old_value_count=$(grep "BlackHoleSink: WriteEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep 'columns\\\":null' | grep -c 'value\\\":1,\\\"default\\\":null},null') - if [[ "$delete_without_old_value_count" -ne 1 ]]; then - echo "can't found delete row without old value" + if [[ "$delete_with_old_value_count" -ne 2 ]]; then + echo "can't found delete row with old value, not 2 found" exit 1 fi }