diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index 25354804aaf..1e74c97e3b1 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -205,32 +205,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig( if err != nil { return nil, err } - if !replicaCfg.EnableOldValue { - sinkURIParsed, err := url.Parse(cfg.SinkURI) - if err != nil { - return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - protocol := sinkURIParsed.Query().Get(config.ProtocolKey) - if protocol != "" { - replicaCfg.Sink.Protocol = protocol - } - for _, fp := range config.ForceEnableOldValueProtocols { - if replicaCfg.Sink.Protocol == fp { - log.Warn( - "Attempting to replicate without old value enabled. "+ - "CDC will enable old value and continue.", - zap.String("protocol", replicaCfg.Sink.Protocol)) - replicaCfg.EnableOldValue = true - break - } - } - - if replicaCfg.ForceReplicate { - return nil, cerror.ErrOldValueNotEnabled.GenWithStackByArgs( - "if use force replicate, old value feature must be enabled") - } - } f, err := filter.NewFilter(replicaCfg, "") if err != nil { return nil, errors.Cause(err) diff --git a/cdc/api/v2/api_helpers_test.go b/cdc/api/v2/api_helpers_test.go index 16889165dfc..1201e528e00 100644 --- a/cdc/api/v2/api_helpers_test.go +++ b/cdc/api/v2/api_helpers_test.go @@ -45,12 +45,14 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { cfg.ReplicaConfig = GetDefaultReplicaConfig() cfg.ReplicaConfig.ForceReplicate = true cfg.ReplicaConfig.EnableOldValue = false - // disable old value but force replicate + cfg.SinkURI = "mysql://" + // disable old value but force replicate, and using mysql sink. cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) require.NotNil(t, err) require.Nil(t, cfInfo) cfg.ReplicaConfig.ForceReplicate = false cfg.ReplicaConfig.IgnoreIneligibleTable = true + cfg.SinkURI = "blackhole://" cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) require.Nil(t, err) require.NotNil(t, cfInfo) @@ -88,6 +90,19 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) { cfg.SinkURI = string([]byte{0x7f, ' '}) cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) require.NotNil(t, err) + + cfg.StartTs = 0 + // use blackhole to workaround + cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro" + cfg.ReplicaConfig.EnableOldValue = true + cfg.ReplicaConfig.ForceReplicate = false + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + require.NoError(t, err) + require.False(t, cfInfo.Config.EnableOldValue) + + cfg.ReplicaConfig.ForceReplicate = true + cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage) + require.Error(t, cerror.ErrOldValueNotEnabled, err) } func TestVerifyUpdateChangefeedConfig(t *testing.T) { @@ -140,4 +155,16 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { cfg.TargetTs = 9 newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.NotNil(t, err) + + cfg.StartTs = 0 + cfg.TargetTs = 0 + cfg.ReplicaConfig.EnableOldValue = true + cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro" + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) + require.NoError(t, err) + require.False(t, newCfInfo.Config.EnableOldValue) + + cfg.ReplicaConfig.ForceReplicate = true + newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) + require.Error(t, cerror.ErrOldValueNotEnabled, err) } diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index c11b9c081f4..dcb012498b5 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -74,7 +74,6 @@ type Mounter interface { type mounter struct { schemaStorage SchemaStorage tz *time.Location - enableOldValue bool changefeedID model.ChangeFeedID filter pfilter.Filter metricTotalRows prometheus.Gauge @@ -98,14 +97,12 @@ func NewMounter(schemaStorage SchemaStorage, changefeedID model.ChangeFeedID, tz *time.Location, filter pfilter.Filter, - enableOldValue bool, integrity *integrity.Config, ) Mounter { return &mounter{ - schemaStorage: schemaStorage, - changefeedID: changefeedID, - enableOldValue: enableOldValue, - filter: filter, + schemaStorage: schemaStorage, + changefeedID: changefeedID, + filter: filter, metricTotalRows: totalRowsCountGauge. WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricIgnoredDMLEventCounter: ignoredDMLEventCounter. @@ -336,7 +333,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { } func datum2Column( - tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool, + tableInfo *model.TableInfo, datums map[int64]types.Datum, ) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) @@ -358,11 +355,6 @@ func datum2Column( colName := colInfo.Name.O colID := colInfo.ID colDatums, exist := datums[colID] - if !exist && !fillWithDefaultValue { - log.Debug("column value is not found", - zap.String("table", tableInfo.Name.O), zap.String("column", colName)) - continue - } var ( colValue interface{} @@ -372,7 +364,7 @@ func datum2Column( ) if exist { colValue, size, warn, err = formatColVal(colDatums, colInfo) - } else if fillWithDefaultValue { + } else { colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo) } if err != nil { @@ -511,7 +503,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) + preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -532,17 +524,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d } corrupted = true } - - // NOTICE: When the old Value feature is off, - // the Delete event only needs to keep the handle key column. - if row.Delete && !m.enableOldValue { - for i := range preCols { - col := preCols[i] - if col != nil && !col.Flag.IsHandleKey() { - preCols[i] = nil - } - } - } } var ( @@ -551,7 +532,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d current uint32 ) if row.RowExist { - cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true) + cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row) if err != nil { return nil, rawRow, errors.Trace(err) } diff --git a/cdc/entry/mounter_group.go b/cdc/entry/mounter_group.go index 7edff971a3f..b7d27902c17 100644 --- a/cdc/entry/mounter_group.go +++ b/cdc/entry/mounter_group.go @@ -34,12 +34,11 @@ type MounterGroup interface { } type mounterGroup struct { - schemaStorage SchemaStorage - inputCh chan *model.PolymorphicEvent - tz *time.Location - filter filter.Filter - enableOldValue bool - integrity *integrity.Config + schemaStorage SchemaStorage + inputCh chan *model.PolymorphicEvent + tz *time.Location + filter filter.Filter + integrity *integrity.Config workerNum int @@ -56,7 +55,6 @@ const ( func NewMounterGroup( schemaStorage SchemaStorage, workerNum int, - enableOldValue bool, filter filter.Filter, tz *time.Location, changefeedID model.ChangeFeedID, @@ -66,11 +64,10 @@ func NewMounterGroup( workerNum = defaultMounterWorkerNum } return &mounterGroup{ - schemaStorage: schemaStorage, - inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize), - enableOldValue: enableOldValue, - filter: filter, - tz: tz, + schemaStorage: schemaStorage, + inputCh: make(chan *model.PolymorphicEvent, defaultInputChanSize), + filter: filter, + tz: tz, integrity: integrity, @@ -111,8 +108,7 @@ func (m *mounterGroup) WaitForReady(_ context.Context) {} func (m *mounterGroup) Close() {} func (m *mounterGroup) runWorker(ctx context.Context) error { - mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, - m.enableOldValue, m.integrity) + mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.integrity) for { select { case <-ctx.Done(): diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 1597dc2e1e8..9786208b9e8 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -313,9 +313,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct { filter, err := filter.NewFilter(config, "") require.Nil(t, err) mounter := NewMounter(scheamStorage, - model.DefaultChangeFeedID("c1"), - time.UTC, filter, false, - config.Integrity).(*mounter) + model.DefaultChangeFeedID("c1"), time.UTC, filter, config.Integrity).(*mounter) mounter.tz = time.Local ctx := context.Background() @@ -1024,8 +1022,7 @@ func TestDecodeRow(t *testing.T) { schemaStorage.AdvanceResolvedTs(ver.Ver) - mounter := NewMounter( - schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter) + mounter := NewMounter(schemaStorage, changefeed, time.Local, filter, cfg.Integrity).(*mounter) helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`) helper.Tk().MustExec(`update student set age = 27 where id = 1`) @@ -1105,7 +1102,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) { ts := schemaStorage.GetLastSnapshot().CurrentTs() schemaStorage.AdvanceResolvedTs(ver.Ver) - mounter := NewMounter(schemaStorage, cfID, time.Local, f, true, cfg.Integrity).(*mounter) + mounter := NewMounter(schemaStorage, cfID, time.Local, f, cfg.Integrity).(*mounter) type testCase struct { schema string @@ -1282,7 +1279,7 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) - cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) + cols, _, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 44addba5668..e3e71831f57 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -18,6 +18,7 @@ import ( "math" "net/url" "regexp" + "strings" "time" "github.com/pingcap/errors" @@ -261,7 +262,7 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) { // VerifyAndComplete verifies changefeed info and may fill in some fields. // If a required field is not provided, return an error. // If some necessary filed is missing but can use a default value, fill in it. -func (info *ChangeFeedInfo) VerifyAndComplete() error { +func (info *ChangeFeedInfo) VerifyAndComplete() { defaultConfig := config.GetDefaultReplicaConfig() if info.Engine == "" { info.Engine = SortUnified @@ -285,8 +286,6 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error { if info.Config.Integrity == nil { info.Config.Integrity = defaultConfig.Integrity } - - return nil } // FixIncompatible fixes incompatible changefeed meta info. @@ -320,6 +319,14 @@ func (info *ChangeFeedInfo) FixIncompatible() { inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66() info.fixScheduler(inheritV66) log.Info("Fix incompatible scheduler completed", zap.String("changefeed", info.String())) + + if creatorVersionGate.ChangefeedAdjustEnableOldValueByProtocol() { + log.Info("Start fixing incompatible enable old value", zap.String("changefeed", info.String()), + zap.Bool("enableOldValue", info.Config.EnableOldValue)) + info.fixEnableOldValue() + log.Info("Fix incompatible enable old value completed", zap.String("changefeed", info.String()), + zap.Bool("enableOldValue", info.Config.EnableOldValue)) + } } // fixState attempts to fix state loss from upgrading the old owner to the new owner. @@ -390,6 +397,18 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() { } } +func (info *ChangeFeedInfo) fixEnableOldValue() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + // this is impossible to happen, since the changefeed registered successfully. + log.Warn("parse sink URI failed", zap.Error(err)) + return + } + scheme := strings.ToLower(uri.Scheme) + protocol := uri.Query().Get(config.ProtocolKey) + info.Config.AdjustEnableOldValue(scheme, protocol) +} + func (info *ChangeFeedInfo) fixMQSinkProtocol() { uri, err := url.Parse(info.SinkURI) if err != nil { diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index c6e41cc7f70..82ec464cd1e 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -154,8 +154,7 @@ func TestVerifyAndComplete(t *testing.T) { }, } - err := info.VerifyAndComplete() - require.Nil(t, err) + info.VerifyAndComplete() require.Equal(t, SortUnified, info.Engine) marshalConfig1, err := info.Config.Marshal() diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 1ec2a6f8143..19ad5bb1134 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -660,7 +660,6 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { p.mg.r = entry.NewMounterGroup(p.ddlHandler.r.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, - p.changefeed.Info.Config.EnableOldValue, p.filter, tz, p.changefeedID, p.changefeed.Info.Config.Integrity) p.mg.name = "MounterGroup" p.mg.spawn(stdCtx) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 4295181040f..976bfc1ab22 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -442,7 +442,7 @@ func convertRowChangedEvents( // This indicates that it is an update event, // and after enable old value internally by default(but disable in the configuration). // We need to handle the update event to be compatible with the old format. - if !enableOldValue && colLen != 0 && preColLen != 0 && colLen == preColLen { + if e.Row.IsUpdate() && !enableOldValue { if shouldSplitUpdateEvent(e) { deleteEvent, insertEvent, err := splitUpdateEvent(e) if err != nil { @@ -507,13 +507,6 @@ func splitUpdateEvent( deleteEvent.RawKV = &deleteEventRowKV deleteEvent.Row.Columns = nil - for i := range deleteEvent.Row.PreColumns { - // NOTICE: Only the handle key pre column is retained in the delete event. - if deleteEvent.Row.PreColumns[i] != nil && - !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { - deleteEvent.Row.PreColumns[i] = nil - } - } insertEvent := *updateEvent insertEventRow := *updateEvent.Row diff --git a/errors.toml b/errors.toml index 5879bfdc225..13c626ab6c7 100755 --- a/errors.toml +++ b/errors.toml @@ -326,6 +326,11 @@ error = ''' illegal parameter for sorter: %s ''' +["CDC:ErrIncompatibleConfig"] +error = ''' +incompatible configuration +''' + ["CDC:ErrIncompatibleSinkConfig"] error = ''' incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index a437fab1a5f..a6aaac87325 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tiflow/pkg/cmd/factory" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" @@ -151,28 +150,14 @@ func (o *createChangefeedOptions) completeReplicaCfg( } } - if !cfg.EnableOldValue { - sinkURIParsed, err := url.Parse(o.commonChangefeedOptions.sinkURI) - if err != nil { - return cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - - protocol := sinkURIParsed.Query().Get(config.ProtocolKey) - if protocol != "" { - cfg.Sink.Protocol = protocol - } - for _, fp := range config.ForceEnableOldValueProtocols { - if cfg.Sink.Protocol == fp { - log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) - cfg.EnableOldValue = true - break - } - } + uri, err := url.Parse(o.commonChangefeedOptions.sinkURI) + if err != nil { + return err + } - if cfg.ForceReplicate { - log.Error("if use force replicate, old value feature must be enabled") - return cerror.ErrOldValueNotEnabled.GenWithStackByArgs() - } + err = cfg.AdjustEnableOldValueAndVerifyForceReplicate(uri) + if err != nil { + return err } for _, rules := range cfg.Sink.DispatchRules { diff --git a/pkg/cmd/cli/cli_changefeed_create_test.go b/pkg/cmd/cli/cli_changefeed_create_test.go index 73cffb5b1a7..5b2667bf25d 100644 --- a/pkg/cmd/cli/cli_changefeed_create_test.go +++ b/pkg/cmd/cli/cli_changefeed_create_test.go @@ -24,6 +24,7 @@ import ( v2 "github.com/pingcap/tiflow/cdc/api/v2" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/spf13/cobra" "github.com/stretchr/testify/require" ) @@ -174,3 +175,38 @@ func TestChangefeedCreateCli(t *testing.T) { require.NoError(t, o.complete(f, cmd)) require.Contains(t, o.validate(cmd).Error(), "creating changefeed with `--sort-dir`") } + +func TestChangefeedCreateCliAdjustEnableOldValue(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + f := newMockFactory(ctrl) + + // enable old value, but use avro as the encoding protocol, should be set to false. + dir := t.TempDir() + configPath := filepath.Join(dir, "adjust-old-value.toml") + err := os.WriteFile(configPath, []byte("enable-old-value=true"), 0o644) + require.NoError(t, err) + + cmd := new(cobra.Command) + o := newChangefeedCommonOptions() + o.addFlags(cmd) + + require.NoError(t, cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", configPath)})) + require.NoError(t, cmd.ParseFlags([]string{"--sink-uri=kafka://127.0.0.1:9092/test?protocol=avro"})) + + opt := newCreateChangefeedOptions(o) + require.NoError(t, opt.complete(f, cmd)) + require.False(t, opt.cfg.EnableOldValue) + + // also enable the force replicate, should return error + configPath = filepath.Join(dir, "enable-old-value-force-replicate.toml") + err = os.WriteFile(configPath, []byte("enable-old-value=true\r\nforce-replicate = true"), 0o644) + require.NoError(t, err) + + require.NoError(t, cmd.ParseFlags([]string{"--sink-uri=kafka://127.0.0.1:9092/test?protocol=avro"})) + require.NoError(t, cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", configPath)})) + + opt = newCreateChangefeedOptions(o) + err = opt.complete(f, cmd) + require.Error(t, cerror.ErrOldValueNotEnabled, err) +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2dde2afe5dc..bbcc2a0ac95 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -180,14 +180,19 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } // ValidateAndAdjust verifies and adjusts the replica configuration. -func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { - // check sink uri +func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sink uri if c.Sink != nil { - err := c.Sink.validateAndAdjust(sinkURI, c.EnableOldValue) + err := c.Sink.validateAndAdjust(sinkURI) + if err != nil { + return err + } + + err = c.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) if err != nil { return err } } + if c.Consistent != nil { err := c.Consistent.ValidateAndAdjust() if err != nil { @@ -265,3 +270,52 @@ func isSinkCompatibleWithSpanReplication(u *url.URL) bool { return u != nil && (strings.Contains(u.Scheme, "kafka") || strings.Contains(u.Scheme, "blackhole")) } + +// AdjustEnableOldValue adjust the old value configuration by the sink scheme and encoding protocol +func (c *ReplicaConfig) AdjustEnableOldValue(scheme, protocol string) { + if sink.IsMySQLCompatibleScheme(scheme) { + return + } + + if c.EnableOldValue { + _, ok := ForceDisableOldValueProtocols[protocol] + if ok { + log.Warn("Attempting to replicate with old value enabled, but the specified protocol must disable old value. "+ + "CDC will disable old value and continue.", zap.String("protocol", protocol)) + c.EnableOldValue = false + } + return + } + + _, ok := ForceEnableOldValueProtocols[protocol] + if ok { + log.Warn("Attempting to replicate with old value disabled, but the specified protocol must enable old value. "+ + "CDC will enable old value and continue.", zap.String("protocol", protocol)) + c.EnableOldValue = true + } +} + +// AdjustEnableOldValueAndVerifyForceReplicate adjust the old value configuration by the sink scheme and encoding protocol +// and then verify the force replicate. +func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url.URL) error { + scheme := strings.ToLower(sinkURI.Scheme) + protocol := sinkURI.Query().Get(ProtocolKey) + if protocol != "" { + c.Sink.Protocol = protocol + } + c.AdjustEnableOldValue(scheme, c.Sink.Protocol) + + if !c.ForceReplicate { + return nil + } + + // MySQL Sink require the old value feature must be enabled to allow delete event send to downstream. + if sink.IsMySQLCompatibleScheme(scheme) { + if !c.EnableOldValue { + log.Error("force replicate, old value feature is disabled for the changefeed using mysql sink") + return cerror.ErrIncompatibleConfig.GenWithStackByArgs() + } + } + + return nil +} diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index e96ba47dfd6..80c9fdbc04d 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/stretchr/testify/require" ) @@ -171,22 +172,23 @@ func TestReplicaConfigValidate(t *testing.T) { sinkURL, err := url.Parse("blackhole://") require.NoError(t, err) - - require.Nil(t, conf.ValidateAndAdjust(sinkURL)) + require.NoError(t, conf.ValidateAndAdjust(sinkURL)) // Incorrect sink configuration. conf = GetDefaultReplicaConfig() conf.Sink.Protocol = "canal" conf.EnableOldValue = false - require.Regexp(t, ".*canal protocol requires old value to be enabled.*", - conf.ValidateAndAdjust(sinkURL)) + + err = conf.ValidateAndAdjust(sinkURL) + require.NoError(t, err) + require.True(t, conf.EnableOldValue) conf = GetDefaultReplicaConfig() conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, } - require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", - conf.ValidateAndAdjust(sinkURL)) + err = conf.ValidateAndAdjust(sinkURL) + require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", err) // Correct sink configuration. conf = GetDefaultReplicaConfig() @@ -306,3 +308,75 @@ func TestIsSinkCompatibleWithSpanReplication(t *testing.T) { require.Equal(t, compatible, tt.compatible, tt.name) } } + +func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { + t.Parallel() + + config := GetDefaultReplicaConfig() + config.EnableOldValue = false + + // mysql sink, do not adjust enable-old-value + sinkURI, err := url.Parse("mysql://") + require.NoError(t, err) + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // mysql sink, `enable-old-value` false, `force-replicate` true, should return error + config.ForceReplicate = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.Error(t, cerror.ErrOldValueNotEnabled, err) + + // canal, `enable-old-value` false, `force-replicate` false, no error, `enable-old-value` adjust to true + config.ForceReplicate = false + config.EnableOldValue = false + // canal require old value enabled + sinkURI, err = url.Parse("kafka://127.0.0.1:9092/test?protocol=canal") + require.NoError(t, err) + + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.True(t, config.EnableOldValue) + + // canal, `force-replicate` true, `enable-old-value` true, no error + config.ForceReplicate = true + config.EnableOldValue = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.True(t, config.ForceReplicate) + require.True(t, config.EnableOldValue) + + // avro, `enable-old-value` false, `force-replicate` false, no error + config.ForceReplicate = false + config.EnableOldValue = false + sinkURI, err = url.Parse("kafka://127.0.0.1:9092/test?protocol=avro") + require.NoError(t, err) + + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // avro, `enable-old-value` true, no error, set to false. no matter `force-replicate` + config.EnableOldValue = true + config.ForceReplicate = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // csv, `enable-old-value` false, `force-replicate` false, no error + config.EnableOldValue = false + config.ForceReplicate = false + sinkURI, err = url.Parse("s3://xxx/yyy?protocol=csv") + require.NoError(t, err) + + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) + + // csv, `enable-old-value` true, no error, set to false. no matter `force-replicate` + config.EnableOldValue = true + config.ForceReplicate = true + err = config.AdjustEnableOldValueAndVerifyForceReplicate(sinkURI) + require.NoError(t, err) + require.False(t, config.EnableOldValue) +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 6587f6aeae4..4331ba9a290 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -94,10 +94,16 @@ func (l AtomicityLevel) validate(scheme string) error { } // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. -var ForceEnableOldValueProtocols = []string{ - ProtocolCanal.String(), - ProtocolCanalJSON.String(), - ProtocolMaxwell.String(), +var ForceEnableOldValueProtocols = map[string]struct{}{ + ProtocolCanal.String(): {}, + ProtocolCanalJSON.String(): {}, + ProtocolMaxwell.String(): {}, +} + +// ForceDisableOldValueProtocols specifies protocols need to be forced to disable old value. +var ForceDisableOldValueProtocols = map[string]struct{}{ + ProtocolAvro.String(): {}, + ProtocolCsv.String(): {}, } // SinkConfig represents sink config for a changefeed @@ -313,21 +319,11 @@ type CloudStorageConfig struct { FileSize *int `toml:"file-size" json:"file-size,omitempty"` } -func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error { +func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err } - if !enableOldValue { - for _, protocolStr := range ForceEnableOldValueProtocols { - if protocolStr == s.Protocol { - log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ - "Please update changefeed config", s.Protocol)) - return cerror.WrapError(cerror.ErrKafkaInvalidConfig, - errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", s.Protocol))) - } - } - } for _, rule := range s.DispatchRules { if rule.DispatcherRule != "" && rule.PartitionRule != "" { log.Error("dispatcher and partition cannot be configured both", zap.Any("rule", rule)) diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index af7051e74bc..27477a6a8d2 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -20,67 +20,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestValidateOldValue(t *testing.T) { - t.Parallel() - testCases := []struct { - protocol string - enableOldValue bool - expectedErr string - }{ - { - protocol: "default", - enableOldValue: false, - expectedErr: "", - }, - { - protocol: "default", - enableOldValue: true, - expectedErr: "", - }, - { - protocol: "canal-json", - enableOldValue: false, - expectedErr: ".*canal-json protocol requires old value to be enabled.*", - }, - { - protocol: "canal-json", - enableOldValue: true, - expectedErr: "", - }, - { - protocol: "canal", - enableOldValue: false, - expectedErr: ".*canal protocol requires old value to be enabled.*", - }, - { - protocol: "canal", - enableOldValue: true, - expectedErr: "", - }, - { - protocol: "maxwell", - enableOldValue: false, - expectedErr: ".*maxwell protocol requires old value to be enabled.*", - }, - { - protocol: "maxwell", - enableOldValue: true, - expectedErr: "", - }, - } - - for _, tc := range testCases { - cfg := SinkConfig{ - Protocol: tc.protocol, - } - if tc.expectedErr == "" { - require.Nil(t, cfg.validateAndAdjust(nil, tc.enableOldValue)) - } else { - require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(nil, tc.enableOldValue)) - } - } -} - func TestValidateTxnAtomicity(t *testing.T) { t.Parallel() testCases := []struct { @@ -156,10 +95,10 @@ func TestValidateTxnAtomicity(t *testing.T) { parsedSinkURI, err := url.Parse(tc.sinkURI) require.Nil(t, err) if tc.expectedErr == "" { - require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Nil(t, cfg.validateAndAdjust(parsedSinkURI)) require.Equal(t, tc.shouldSplitTxn, cfg.TxnAtomicity.ShouldSplitTxn()) } else { - require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI)) } } } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index b20b6e1e95d..6ced343519b 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -320,6 +320,10 @@ var ( "old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled"), ) + ErrIncompatibleConfig = errors.Normalize( + "incompatible configuration", + errors.RFCCodeText("CDC:ErrIncompatibleConfig"), + ) ErrSinkInvalidConfig = errors.Normalize( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index b3b9ab76008..7ebd3da004d 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -229,9 +229,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er return errors.Trace(err) } if key.Tp == etcd.CDCKeyTypeChangefeedInfo { - if err := s.Info.VerifyAndComplete(); err != nil { - return errors.Trace(err) - } + s.Info.VerifyAndComplete() } return nil } diff --git a/pkg/sink/codec/builder/codec_test.go b/pkg/sink/codec/builder/codec_test.go index 914aa7370d1..846146ccbec 100644 --- a/pkg/sink/codec/builder/codec_test.go +++ b/pkg/sink/codec/builder/codec_test.go @@ -71,14 +71,14 @@ func TestJsonVsCraftVsPB(t *testing.T) { if len(cs) == 0 { continue } - craftEncoder := craft.NewBatchEncoder() - craftEncoder.(*craft.BatchEncoder).MaxMessageBytes = 8192 - craftEncoder.(*craft.BatchEncoder).MaxBatchSize = 64 + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } + craftEncoder := craft.NewBatchEncoder(config) craftMessages := encodeRowCase(t, craftEncoder, cs) - jsonEncoder := open.NewBatchEncoder() - jsonEncoder.(*open.BatchEncoder).MaxMessageBytes = 8192 - jsonEncoder.(*open.BatchEncoder).MaxBatchSize = 64 + jsonEncoder := open.NewBatchEncoder(config) jsonMessages := encodeRowCase(t, jsonEncoder, cs) protobuf1Messages := codecEncodeRowChangedPB1ToMessage(cs) @@ -226,16 +226,17 @@ func codecEncodeRowCase(encoder codec.RowEventEncoder, func init() { var err error - encoder := craft.NewBatchEncoder() - encoder.(*craft.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*craft.BatchEncoder).MaxBatchSize = 64 + + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } + encoder := craft.NewBatchEncoder(config) if codecCraftEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil { panic(err) } - encoder = open.NewBatchEncoder() - encoder.(*open.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*open.BatchEncoder).MaxBatchSize = 64 + encoder = open.NewBatchEncoder(config) if codecJSONEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil { panic(err) } @@ -244,19 +245,23 @@ func init() { } func BenchmarkCraftEncoding(b *testing.B) { + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } allocator := craft.NewSliceAllocator(128) - encoder := craft.NewBatchEncoderWithAllocator(allocator) - encoder.(*craft.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*craft.BatchEncoder).MaxBatchSize = 64 + encoder := craft.NewBatchEncoderWithAllocator(allocator, config) for i := 0; i < b.N; i++ { _, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges) } } func BenchmarkJsonEncoding(b *testing.B) { - encoder := open.NewBatchEncoder() - encoder.(*open.BatchEncoder).MaxMessageBytes = 8192 - encoder.(*open.BatchEncoder).MaxBatchSize = 64 + config := &common.Config{ + MaxMessageBytes: 8192, + MaxBatchSize: 64, + } + encoder := open.NewBatchEncoder(config) for i := 0; i < b.N; i++ { _, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges) } diff --git a/pkg/sink/codec/builder/encoder_builder.go b/pkg/sink/codec/builder/encoder_builder.go index 9a903e60ee1..9de2d21a069 100644 --- a/pkg/sink/codec/builder/encoder_builder.go +++ b/pkg/sink/codec/builder/encoder_builder.go @@ -37,11 +37,11 @@ func NewRowEventEncoderBuilder( case config.ProtocolDefault, config.ProtocolOpen: return open.NewBatchEncoderBuilder(c), nil case config.ProtocolCanal: - return canal.NewBatchEncoderBuilder(), nil + return canal.NewBatchEncoderBuilder(c), nil case config.ProtocolAvro: return avro.NewBatchEncoderBuilder(ctx, c) case config.ProtocolMaxwell: - return maxwell.NewBatchEncoderBuilder(), nil + return maxwell.NewBatchEncoderBuilder(c), nil case config.ProtocolCanalJSON: return canal.NewJSONRowEventEncoderBuilder(c), nil case config.ProtocolCraft: diff --git a/pkg/sink/codec/canal/canal_encoder.go b/pkg/sink/codec/canal/canal_encoder.go index cfc81756306..38a4746374f 100644 --- a/pkg/sink/codec/canal/canal_encoder.go +++ b/pkg/sink/codec/canal/canal_encoder.go @@ -34,6 +34,8 @@ type BatchEncoder struct { callbackBuf []func() packet *canal.Packet entryBuilder *canalEntryBuilder + + config *common.Config } // EncodeCheckpointEvent implements the RowEventEncoder interface @@ -50,7 +52,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - entry, err := d.entryBuilder.fromRowEvent(e) + entry, err := d.entryBuilder.fromRowEvent(e, d.config.OnlyHandleKeyColumns) if err != nil { return errors.Trace(err) } @@ -156,25 +158,31 @@ func (d *BatchEncoder) resetPacket() { } // newBatchEncoder creates a new canalBatchEncoder. -func newBatchEncoder() codec.RowEventEncoder { +func newBatchEncoder(config *common.Config) codec.RowEventEncoder { encoder := &BatchEncoder{ messages: &canal.Messages{}, callbackBuf: make([]func(), 0), entryBuilder: newCanalEntryBuilder(), + + config: config, } encoder.resetPacket() return encoder } -type batchEncoderBuilder struct{} +type batchEncoderBuilder struct { + config *common.Config +} // Build a `canalBatchEncoder` func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return newBatchEncoder() + return newBatchEncoder(b.config) } // NewBatchEncoderBuilder creates a canal batchEncoderBuilder. -func NewBatchEncoderBuilder() codec.RowEventEncoderBuilder { - return &batchEncoderBuilder{} +func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder { + return &batchEncoderBuilder{ + config: config, + } } diff --git a/pkg/sink/codec/canal/canal_encoder_test.go b/pkg/sink/codec/canal/canal_encoder_test.go index 481c781993f..2fe8bc06372 100644 --- a/pkg/sink/codec/canal/canal_encoder_test.go +++ b/pkg/sink/codec/canal/canal_encoder_test.go @@ -20,6 +20,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" canal "github.com/pingcap/tiflow/proto/canal" "github.com/stretchr/testify/require" ) @@ -28,7 +30,7 @@ func TestCanalBatchEncoder(t *testing.T) { t.Parallel() s := defaultCanalBatchTester for _, cs := range s.rowCases { - encoder := newBatchEncoder() + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, row := range cs { err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) @@ -55,7 +57,7 @@ func TestCanalBatchEncoder(t *testing.T) { } for _, cs := range s.ddlCases { - encoder := newBatchEncoder() + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) for _, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) require.Nil(t, err) @@ -76,7 +78,7 @@ func TestCanalBatchEncoder(t *testing.T) { } func TestCanalAppendRowChangedEventWithCallback(t *testing.T) { - encoder := newBatchEncoder() + encoder := newBatchEncoder(common.NewConfig(config.ProtocolCanal)) require.NotNil(t, encoder) count := 0 diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index 809074dc72c..4b897bf3f99 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -144,7 +144,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated } // build the RowData of a canal entry -func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowData, error) { +func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.RowData, error) { var columns []*canal.Column for _, column := range e.Columns { if column == nil { @@ -156,11 +156,16 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa } columns = append(columns, c) } + + onlyHandleKeyColumns = onlyHandleKeyColumns && e.IsDelete() var preColumns []*canal.Column for _, column := range e.PreColumns { if column == nil { continue } + if onlyHandleKeyColumns && !column.Flag.IsHandleKey() { + continue + } c, err := b.buildColumn(column, column.Name, !e.IsDelete()) if err != nil { return nil, errors.Trace(err) @@ -175,11 +180,11 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa } // fromRowEvent builds canal entry from cdc RowChangedEvent -func (b *canalEntryBuilder) fromRowEvent(e *model.RowChangedEvent) (*canal.Entry, error) { +func (b *canalEntryBuilder) fromRowEvent(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*canal.Entry, error) { eventType := convertRowEventType(e) header := b.buildHeader(e.CommitTs, e.Table.Schema, e.Table.Table, eventType, 1) isDdl := isCanalDDL(eventType) // false - rowData, err := b.buildRowData(e) + rowData, err := b.buildRowData(e, onlyHandleKeyColumns) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/sink/codec/canal/canal_entry_test.go b/pkg/sink/codec/canal/canal_entry_test.go index 0d12d23aad5..21f65f2b7ac 100644 --- a/pkg/sink/codec/canal/canal_entry_test.go +++ b/pkg/sink/codec/canal/canal_entry_test.go @@ -70,7 +70,7 @@ func testInsert(t *testing.T) { } builder := newCanalEntryBuilder() - entry, err := builder.fromRowEvent(testCaseInsert) + entry, err := builder.fromRowEvent(testCaseInsert, false) require.Nil(t, err) require.Equal(t, canal.EntryType_ROWDATA, entry.GetEntryType()) header := entry.GetHeader() @@ -146,7 +146,7 @@ func testUpdate(t *testing.T) { }, } builder := newCanalEntryBuilder() - entry, err := builder.fromRowEvent(testCaseUpdate) + entry, err := builder.fromRowEvent(testCaseUpdate, false) require.Nil(t, err) require.Equal(t, canal.EntryType_ROWDATA, entry.GetEntryType()) @@ -219,7 +219,7 @@ func testDelete(t *testing.T) { } builder := newCanalEntryBuilder() - entry, err := builder.fromRowEvent(testCaseDelete) + entry, err := builder.fromRowEvent(testCaseDelete, false) require.Nil(t, err) require.Equal(t, canal.EntryType_ROWDATA, entry.GetEntryType()) header := entry.GetHeader() diff --git a/pkg/sink/codec/canal/canal_json_decoder_test.go b/pkg/sink/codec/canal/canal_json_decoder_test.go index c1783f4b1ad..3314884274b 100644 --- a/pkg/sink/codec/canal/canal_json_decoder_test.go +++ b/pkg/sink/codec/canal/canal_json_decoder_test.go @@ -85,8 +85,8 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) { t.Parallel() for _, encodeEnable := range []bool{false, true} { encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: encodeEnable, + builder: newCanalEntryBuilder(), + config: &common.Config{EnableTiDBExtension: encodeEnable}, } require.NotNil(t, encoder) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go index 61e0a1ae81c..7e499cdf86d 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -31,15 +31,15 @@ import ( func newJSONMessageForDML( builder *canalEntryBuilder, - enableTiDBExtension bool, e *model.RowChangedEvent, - onlyOutputUpdatedColumns bool, + config *common.Config, ) ([]byte, error) { isDelete := e.IsDelete() mysqlTypeMap := make(map[string]string, len(e.Columns)) filling := func(columns []*model.Column, out *jwriter.Writer, onlyOutputUpdatedColumn bool, + onlyHandleKeyColumns bool, newColumnMap map[string]*model.Column, ) error { if len(columns) == 0 { @@ -55,6 +55,9 @@ func newJSONMessageForDML( if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { continue } + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } if isFirst { isFirst = false } else { @@ -152,6 +155,9 @@ func newJSONMessageForDML( emptyColumn := true for _, col := range columns { if col != nil { + if isDelete && config.OnlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } if emptyColumn { out.RawByte('{') emptyColumn = false @@ -200,36 +206,36 @@ func newJSONMessageForDML( if e.IsDelete() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := filling(e.PreColumns, out, false, nil); err != nil { + if err := filling(e.PreColumns, out, false, config.OnlyHandleKeyColumns, nil); err != nil { return nil, err } } else if e.IsInsert() { out.RawString(",\"old\":null") out.RawString(",\"data\":") - if err := filling(e.Columns, out, false, nil); err != nil { + if err := filling(e.Columns, out, false, false, nil); err != nil { return nil, err } } else if e.IsUpdate() { var newColsMap map[string]*model.Column - if onlyOutputUpdatedColumns { + if config.OnlyOutputUpdatedColumns { newColsMap = make(map[string]*model.Column, len(e.Columns)) for _, col := range e.Columns { newColsMap[col.Name] = col } } out.RawString(",\"old\":") - if err := filling(e.PreColumns, out, onlyOutputUpdatedColumns, newColsMap); err != nil { + if err := filling(e.PreColumns, out, config.OnlyOutputUpdatedColumns, false, newColsMap); err != nil { return nil, err } out.RawString(",\"data\":") - if err := filling(e.Columns, out, false, nil); err != nil { + if err := filling(e.Columns, out, false, false, nil); err != nil { return nil, err } } else { log.Panic("unreachable event type", zap.Any("event", e)) } - if enableTiDBExtension { + if config.EnableTiDBExtension { const prefix string = ",\"_tidb\":" out.RawString(prefix) out.RawByte('{') @@ -254,25 +260,19 @@ func eventTypeString(e *model.RowChangedEvent) string { // JSONRowEventEncoder encodes row event in JSON format type JSONRowEventEncoder struct { - builder *canalEntryBuilder + builder *canalEntryBuilder + messages []*common.Message - // When it is true, canal-json would generate TiDB extension information - // which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields. - enableTiDBExtension bool - maxMessageBytes int - messages []*common.Message - - onlyOutputUpdatedColumns bool + config *common.Config } // newJSONRowEventEncoder creates a new JSONRowEventEncoder func newJSONRowEventEncoder(config *common.Config) codec.RowEventEncoder { encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: config.EnableTiDBExtension, - onlyOutputUpdatedColumns: config.OnlyOutputUpdatedColumns, - messages: make([]*common.Message, 0, 1), - maxMessageBytes: config.MaxMessageBytes, + builder: newCanalEntryBuilder(), + messages: make([]*common.Message, 0, 1), + + config: config, } return encoder } @@ -289,7 +289,7 @@ func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONM Query: e.Query, } - if !c.enableTiDBExtension { + if !c.config.EnableTiDBExtension { return msg } @@ -316,7 +316,7 @@ func (c *JSONRowEventEncoder) newJSONMessage4CheckpointEvent( // EncodeCheckpointEvent implements the RowEventEncoder interface func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { - if !c.enableTiDBExtension { + if !c.config.EnableTiDBExtension { return nil, nil } @@ -335,17 +335,16 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - value, err := newJSONMessageForDML(c.builder, - c.enableTiDBExtension, e, c.onlyOutputUpdatedColumns) + value, err := newJSONMessageForDML(c.builder, e, c.config) if err != nil { return errors.Trace(err) } length := len(value) + common.MaxRecordOverhead // for single message that is longer than max-message-bytes, do not send it. - if length > c.maxMessageBytes { + if length > c.config.MaxMessageBytes { log.Warn("Single message is too large for canal-json", - zap.Int("maxMessageBytes", c.maxMessageBytes), + zap.Int("maxMessageBytes", c.config.MaxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table), zap.Any("value", value)) diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index bb6a937e5de..6fd0f21700d 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -33,13 +33,7 @@ func TestBuildCanalJSONRowEventEncoder(t *testing.T) { builder := &jsonRowEventEncoderBuilder{config: cfg} encoder, ok := builder.Build().(*JSONRowEventEncoder) require.True(t, ok) - require.False(t, encoder.enableTiDBExtension) - - cfg.EnableTiDBExtension = true - builder = &jsonRowEventEncoderBuilder{config: cfg} - encoder, ok = builder.Build().(*JSONRowEventEncoder) - require.True(t, ok) - require.True(t, encoder.enableTiDBExtension) + require.NotNil(t, encoder.config) } func TestNewCanalJSONMessage4DML(t *testing.T) { @@ -53,8 +47,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok := e.(*JSONRowEventEncoder) require.True(t, ok) - data, err := newJSONMessageForDML(encoder.builder, - encoder.enableTiDBExtension, testCaseInsert, false) + data, err := newJSONMessageForDML(encoder.builder, testCaseInsert, encoder.config) require.Nil(t, err) var msg canalJSONMessageInterface = &JSONMessage{} err = json.Unmarshal(data, msg) @@ -69,6 +62,12 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, "person", jsonMsg.Table) require.False(t, jsonMsg.IsDDL) + for _, col := range testCaseInsert.Columns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + // check data is enough obtainedDataMap := jsonMsg.getData() require.NotNil(t, obtainedDataMap) @@ -97,8 +96,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Equal(t, item.expectedEncodedValue, obtainedValue) } - data, err = newJSONMessageForDML(encoder.builder, - encoder.enableTiDBExtension, testCaseUpdate, false) + data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config) require.Nil(t, err) jsonMsg = &JSONMessage{} err = json.Unmarshal(data, jsonMsg) @@ -107,8 +105,16 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.NotNil(t, jsonMsg.Old) require.Equal(t, "UPDATE", jsonMsg.EventType) - data, err = newJSONMessageForDML(encoder.builder, - encoder.enableTiDBExtension, testCaseDelete, false) + for _, col := range testCaseUpdate.Columns { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } + for _, col := range testCaseUpdate.PreColumns { + require.Contains(t, jsonMsg.Old[0], col.Name) + } + + data, err = newJSONMessageForDML(encoder.builder, testCaseDelete, encoder.config) require.Nil(t, err) jsonMsg = &JSONMessage{} err = json.Unmarshal(data, jsonMsg) @@ -117,16 +123,40 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { require.Nil(t, jsonMsg.Old) require.Equal(t, "DELETE", jsonMsg.EventType) + for _, col := range testCaseDelete.PreColumns { + require.Contains(t, jsonMsg.Data[0], col.Name) + } + + data, err = newJSONMessageForDML(encoder.builder, testCaseDelete, &common.Config{OnlyHandleKeyColumns: true}) + require.NoError(t, err) + jsonMsg = &JSONMessage{} + err = json.Unmarshal(data, jsonMsg) + require.NoError(t, err) + require.NotNil(t, jsonMsg.Data) + require.Nil(t, jsonMsg.Old) + + for _, col := range testCaseDelete.PreColumns { + if col.Flag.IsHandleKey() { + require.Contains(t, jsonMsg.Data[0], col.Name) + require.Contains(t, jsonMsg.SQLType, col.Name) + require.Contains(t, jsonMsg.MySQLType, col.Name) + } else { + require.NotContains(t, jsonMsg.Data[0], col.Name) + require.NotContains(t, jsonMsg.SQLType, col.Name) + require.NotContains(t, jsonMsg.MySQLType, col.Name) + } + } + e = newJSONRowEventEncoder(&common.Config{ - EnableTiDBExtension: true, - Terminator: "", + EnableTiDBExtension: true, + Terminator: "", + OnlyOutputUpdatedColumns: true, }) require.NotNil(t, e) encoder, ok = e.(*JSONRowEventEncoder) require.True(t, ok) - data, err = newJSONMessageForDML(encoder.builder, - encoder.enableTiDBExtension, testCaseUpdate, false) + data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config) require.Nil(t, err) withExtension := &canalJSONMessageWithTiDBExtension{} @@ -138,8 +168,7 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { encoder, ok = e.(*JSONRowEventEncoder) require.True(t, ok) - data, err = newJSONMessageForDML(encoder.builder, - encoder.enableTiDBExtension, testCaseUpdate, true) + data, err = newJSONMessageForDML(encoder.builder, testCaseUpdate, encoder.config) require.Nil(t, err) withExtension = &canalJSONMessageWithTiDBExtension{} @@ -153,8 +182,9 @@ func TestNewCanalJSONMessage4DML(t *testing.T) { func TestNewCanalJSONMessageFromDDL(t *testing.T) { t.Parallel() - encoder := &JSONRowEventEncoder{builder: newCanalEntryBuilder()} - require.NotNil(t, encoder) + + encoder, ok := newJSONRowEventEncoder(&common.Config{}).(*JSONRowEventEncoder) + require.True(t, ok) message := encoder.newJSONMessageForDDL(testCaseDDL) require.NotNil(t, message) @@ -168,8 +198,10 @@ func TestNewCanalJSONMessageFromDDL(t *testing.T) { require.Equal(t, testCaseDDL.Query, msg.Query) require.Equal(t, "CREATE", msg.EventType) - encoder = &JSONRowEventEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: true} - require.NotNil(t, encoder) + encoder, ok = newJSONRowEventEncoder(&common.Config{ + EnableTiDBExtension: true, + }).(*JSONRowEventEncoder) + require.True(t, ok) message = encoder.newJSONMessageForDDL(testCaseDDL) require.NotNil(t, message) @@ -220,9 +252,12 @@ func TestEncodeCheckpointEvent(t *testing.T) { t.Parallel() var watermark uint64 = 2333 for _, enable := range []bool{false, true} { + config := &common.Config{ + EnableTiDBExtension: enable, + } encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: enable, + builder: newCanalEntryBuilder(), + config: config, } require.NotNil(t, encoder) @@ -262,8 +297,8 @@ func TestCheckpointEventValueMarshal(t *testing.T) { t.Parallel() var watermark uint64 = 1024 encoder := &JSONRowEventEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: true, + builder: newCanalEntryBuilder(), + config: &common.Config{EnableTiDBExtension: true}, } require.NotNil(t, encoder) msg, err := encoder.EncodeCheckpointEvent(watermark) @@ -308,7 +343,10 @@ func TestCheckpointEventValueMarshal(t *testing.T) { func TestDDLEventWithExtensionValueMarshal(t *testing.T) { t.Parallel() - encoder := &JSONRowEventEncoder{builder: newCanalEntryBuilder(), enableTiDBExtension: true} + encoder := &JSONRowEventEncoder{ + builder: newCanalEntryBuilder(), + config: &common.Config{EnableTiDBExtension: true}, + } require.NotNil(t, encoder) message := encoder.newJSONMessageForDDL(testCaseDDL) diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go index 691767a6841..78851c6e9c7 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go @@ -30,17 +30,13 @@ import ( type JSONTxnEventEncoder struct { builder *canalEntryBuilder - // When it is true, canal-json would generate TiDB extension information - // which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields. - enableTiDBExtension bool + config *common.Config - onlyOutputUpdatedColumns bool // the symbol separating two lines - terminator []byte - maxMessageBytes int - valueBuf *bytes.Buffer - batchSize int - callback func() + terminator []byte + valueBuf *bytes.Buffer + batchSize int + callback func() // Store some fields of the txn event. txnCommitTs uint64 @@ -54,16 +50,15 @@ func (j *JSONTxnEventEncoder) AppendTxnEvent( callback func(), ) error { for _, row := range txn.Rows { - value, err := newJSONMessageForDML(j.builder, - j.enableTiDBExtension, row, j.onlyOutputUpdatedColumns) + value, err := newJSONMessageForDML(j.builder, row, j.config) if err != nil { return errors.Trace(err) } length := len(value) + common.MaxRecordOverhead // For single message that is longer than max-message-bytes, do not send it. - if length > j.maxMessageBytes { + if length > j.config.MaxMessageBytes { log.Warn("Single message is too large for canal-json", - zap.Int("maxMessageBytes", j.maxMessageBytes), + zap.Int("maxMessageBytes", j.config.MaxMessageBytes), zap.Int("length", length), zap.Any("table", row.Table), zap.Any("value", value)) @@ -103,12 +98,11 @@ func (j *JSONTxnEventEncoder) Build() []*common.Message { // newJSONTxnEventEncoder creates a new JSONTxnEventEncoder func newJSONTxnEventEncoder(config *common.Config) codec.TxnEventEncoder { encoder := &JSONTxnEventEncoder{ - builder: newCanalEntryBuilder(), - enableTiDBExtension: config.EnableTiDBExtension, - onlyOutputUpdatedColumns: config.OnlyOutputUpdatedColumns, - valueBuf: &bytes.Buffer{}, - terminator: []byte(config.Terminator), - maxMessageBytes: config.MaxMessageBytes, + builder: newCanalEntryBuilder(), + valueBuf: &bytes.Buffer{}, + terminator: []byte(config.Terminator), + + config: config, } return encoder } diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go index 0e1784bf692..71db664d285 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder_test.go @@ -30,13 +30,7 @@ func TestBuildCanalJSONTxnEventEncoder(t *testing.T) { builder := NewJSONTxnEventEncoderBuilder(cfg) encoder, ok := builder.Build().(*JSONTxnEventEncoder) require.True(t, ok) - require.False(t, encoder.enableTiDBExtension) - - cfg.EnableTiDBExtension = true - builder = NewJSONTxnEventEncoderBuilder(cfg) - encoder, ok = builder.Build().(*JSONTxnEventEncoder) - require.True(t, ok) - require.True(t, encoder.enableTiDBExtension) + require.NotNil(t, encoder.config) } func TestCanalJSONTxnEventEncoderMaxMessageBytes(t *testing.T) { diff --git a/pkg/sink/codec/canal/canal_test_util.go b/pkg/sink/codec/canal/canal_test_util.go index 744a74a6c65..c5541b8b33a 100644 --- a/pkg/sink/codec/canal/canal_test_util.go +++ b/pkg/sink/codec/canal/canal_test_util.go @@ -35,7 +35,7 @@ type testColumnTuple struct { var ( testColumnsTable = []*testColumnTuple{ { - &model.Column{Name: "tinyint", Type: mysql.TypeTiny, Value: int64(127)}, + &model.Column{Name: "tinyint", Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Type: mysql.TypeTiny, Value: int64(127)}, "tinyint", internal.JavaSQLTypeTINYINT, "127", "127", }, diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 689d53b38a5..2e8d34b23f1 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -37,6 +37,9 @@ type Config struct { MaxMessageBytes int MaxBatchSize int + // onlyHandleKeyColumns is true, for the delete event only output the handle key columns. + OnlyHandleKeyColumns bool + EnableTiDBExtension bool EnableRowChecksum bool @@ -165,6 +168,8 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er c.EnableRowChecksum = replicaConfig.Integrity.Enabled() } + c.OnlyHandleKeyColumns = !replicaConfig.EnableOldValue + return nil } diff --git a/pkg/sink/codec/common/config_test.go b/pkg/sink/codec/common/config_test.go index a6263d674d1..e55e73029cf 100644 --- a/pkg/sink/codec/common/config_test.go +++ b/pkg/sink/codec/common/config_test.go @@ -113,10 +113,16 @@ func TestConfigApplyValidate(t *testing.T) { err = c.Apply(sinkURI, replicaConfig) require.NoError(t, err) require.True(t, c.EnableTiDBExtension) + require.False(t, c.OnlyHandleKeyColumns) err = c.Validate() require.NoError(t, err) + replicaConfig.EnableOldValue = false + err = c.Apply(sinkURI, replicaConfig) + require.NoError(t, err) + require.True(t, c.OnlyHandleKeyColumns) + uri = "kafka://127.0.0.1:9092/abc?protocol=canal-json&enable-tidb-extension=a" sinkURI, err = url.Parse(uri) require.NoError(t, err) diff --git a/pkg/sink/codec/craft/craft_encoder.go b/pkg/sink/codec/craft/craft_encoder.go index 8623d746132..3057a04c98e 100644 --- a/pkg/sink/codec/craft/craft_encoder.go +++ b/pkg/sink/codec/craft/craft_encoder.go @@ -28,9 +28,7 @@ type BatchEncoder struct { messageBuf []*common.Message callbackBuf []func() - // configs - MaxMessageBytes int - MaxBatchSize int + config *common.Config allocator *SliceAllocator } @@ -49,11 +47,11 @@ func (e *BatchEncoder) AppendRowChangedEvent( ev *model.RowChangedEvent, callback func(), ) error { - rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) + rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev, e.config.OnlyHandleKeyColumns) if callback != nil { e.callbackBuf = append(e.callbackBuf, callback) } - if size > e.MaxMessageBytes || rows >= e.MaxBatchSize { + if size > e.config.MaxMessageBytes || rows >= e.config.MaxBatchSize { e.flush() } return nil @@ -98,11 +96,11 @@ func (e *BatchEncoder) flush() { } // NewBatchEncoder creates a new BatchEncoder. -func NewBatchEncoder() codec.RowEventEncoder { +func NewBatchEncoder(config *common.Config) codec.RowEventEncoder { // 64 is a magic number that come up with these assumptions and manual benchmark. // 1. Most table will not have more than 64 columns // 2. It only worth allocating slices in batch for slices that's small enough - return NewBatchEncoderWithAllocator(NewSliceAllocator(64)) + return NewBatchEncoderWithAllocator(NewSliceAllocator(64), config) } type batchEncoderBuilder struct { @@ -111,10 +109,7 @@ type batchEncoderBuilder struct { // Build a BatchEncoder func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - encoder := NewBatchEncoder() - encoder.(*BatchEncoder).MaxMessageBytes = b.config.MaxMessageBytes - encoder.(*BatchEncoder).MaxBatchSize = b.config.MaxBatchSize - return encoder + return NewBatchEncoder(b.config) } // NewBatchEncoderBuilder creates a craft batchEncoderBuilder. @@ -123,11 +118,12 @@ func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder } // NewBatchEncoderWithAllocator creates a new BatchEncoder with given allocator. -func NewBatchEncoderWithAllocator(allocator *SliceAllocator) codec.RowEventEncoder { +func NewBatchEncoderWithAllocator(allocator *SliceAllocator, config *common.Config) codec.RowEventEncoder { return &BatchEncoder{ allocator: allocator, messageBuf: make([]*common.Message, 0, 2), callbackBuf: make([]func(), 0), rowChangedBuffer: NewRowChangedEventBuffer(allocator), + config: config, } } diff --git a/pkg/sink/codec/craft/craft_encoder_test.go b/pkg/sink/codec/craft/craft_encoder_test.go index 4537a79c601..195822189f1 100644 --- a/pkg/sink/codec/craft/craft_encoder_test.go +++ b/pkg/sink/codec/craft/craft_encoder_test.go @@ -104,8 +104,7 @@ func TestBuildCraftBatchEncoder(t *testing.T) { builder := &batchEncoderBuilder{config: cfg} encoder, ok := builder.Build().(*BatchEncoder) require.True(t, ok) - require.Equal(t, cfg.MaxBatchSize, encoder.MaxBatchSize) - require.Equal(t, cfg.MaxMessageBytes, encoder.MaxMessageBytes) + require.NotNil(t, encoder.config) } func testBatchCodec( diff --git a/pkg/sink/codec/craft/model.go b/pkg/sink/codec/craft/model.go index 12943b77658..71af0e88f14 100644 --- a/pkg/sink/codec/craft/model.go +++ b/pkg/sink/codec/craft/model.go @@ -366,7 +366,7 @@ func decodeColumnGroup(bits []byte, allocator *SliceAllocator, dict *termDiction }, nil } -func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column) (int, *columnGroup) { +func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column, onlyHandleKeyColumns bool) (int, *columnGroup) { l := len(columns) if l == 0 { return 0, nil @@ -381,6 +381,9 @@ func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column) if col == nil { continue } + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } names[idx] = col.Name types[idx] = uint64(col.Type) flags[idx] = uint64(col.Flag) @@ -404,7 +407,7 @@ func newColumnGroup(allocator *SliceAllocator, ty byte, columns []*model.Column) // Row changed message is basically an array of column groups type rowChangedEvent = []*columnGroup -func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent) (int, rowChangedEvent) { +func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent, onlyHandleKeyColumns bool) (int, rowChangedEvent) { numGroups := 0 if ev.PreColumns != nil { numGroups++ @@ -415,12 +418,13 @@ func newRowChangedMessage(allocator *SliceAllocator, ev *model.RowChangedEvent) groups := allocator.columnGroupSlice(numGroups) estimatedSize := 0 idx := 0 - if size, group := newColumnGroup(allocator, columnGroupTypeNew, ev.Columns); group != nil { + if size, group := newColumnGroup(allocator, columnGroupTypeNew, ev.Columns, false); group != nil { groups[idx] = group idx++ estimatedSize += size } - if size, group := newColumnGroup(allocator, columnGroupTypeOld, ev.PreColumns); group != nil { + onlyHandleKeyColumns = onlyHandleKeyColumns && ev.IsDelete() + if size, group := newColumnGroup(allocator, columnGroupTypeOld, ev.PreColumns, onlyHandleKeyColumns); group != nil { groups[idx] = group estimatedSize += size } @@ -454,7 +458,7 @@ func (b *RowChangedEventBuffer) Encode() []byte { } // AppendRowChangedEvent append a new event to buffer -func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent) (rows, size int) { +func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent, onlyHandleKeyColumns bool) (rows, size int) { var partition int64 = -1 if ev.Table.IsPartition { partition = ev.Table.TableID @@ -479,7 +483,7 @@ func (b *RowChangedEventBuffer) AppendRowChangedEvent(ev *model.RowChangedEvent) if b.eventsCount+1 > len(b.events) { b.events = b.allocator.resizeRowChangedEventSlice(b.events, newBufferSize(b.eventsCount)) } - size, message := newRowChangedMessage(b.allocator, ev) + size, message := newRowChangedMessage(b.allocator, ev, onlyHandleKeyColumns) b.events[b.eventsCount] = message b.eventsCount++ b.estimatedSize += size diff --git a/pkg/sink/codec/maxwell/maxwell_encoder.go b/pkg/sink/codec/maxwell/maxwell_encoder.go index a5eb7e4acd6..661fd5a30ff 100644 --- a/pkg/sink/codec/maxwell/maxwell_encoder.go +++ b/pkg/sink/codec/maxwell/maxwell_encoder.go @@ -31,6 +31,8 @@ type BatchEncoder struct { valueBuf *bytes.Buffer callbackBuf []func() batchSize int + + config *common.Config } // EncodeCheckpointEvent implements the RowEventEncoder interface @@ -47,7 +49,7 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - _, valueMsg := rowChangeToMaxwellMsg(e) + _, valueMsg := rowChangeToMaxwellMsg(e, d.config.OnlyHandleKeyColumns) value, err := valueMsg.encode() if err != nil { return errors.Trace(err) @@ -109,24 +111,29 @@ func (d *BatchEncoder) reset() { } // newBatchEncoder creates a new maxwell BatchEncoder. -func newBatchEncoder() codec.RowEventEncoder { +func newBatchEncoder(config *common.Config) codec.RowEventEncoder { batch := &BatchEncoder{ keyBuf: &bytes.Buffer{}, valueBuf: &bytes.Buffer{}, callbackBuf: make([]func(), 0), + config: config, } batch.reset() return batch } -type batchEncoderBuilder struct{} +type batchEncoderBuilder struct { + config *common.Config +} // NewBatchEncoderBuilder creates a maxwell batchEncoderBuilder. -func NewBatchEncoderBuilder() codec.RowEventEncoderBuilder { - return &batchEncoderBuilder{} +func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder { + return &batchEncoderBuilder{ + config: config, + } } // Build a `maxwellBatchEncoder` func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - return newBatchEncoder() + return newBatchEncoder(b.config) } diff --git a/pkg/sink/codec/maxwell/maxwell_encoder_test.go b/pkg/sink/codec/maxwell/maxwell_encoder_test.go index 4220db9a15b..d9703b8ce52 100644 --- a/pkg/sink/codec/maxwell/maxwell_encoder_test.go +++ b/pkg/sink/codec/maxwell/maxwell_encoder_test.go @@ -20,6 +20,7 @@ import ( timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/stretchr/testify/require" ) @@ -33,7 +34,7 @@ func TestMaxwellBatchCodec(t *testing.T) { Columns: []*model.Column{{Name: "col1", Type: 3, Value: 10}}, }}, {}} for _, cs := range rowCases { - encoder := newEncoder() + encoder := newEncoder(&common.Config{}) for _, row := range cs { err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) @@ -59,7 +60,7 @@ func TestMaxwellBatchCodec(t *testing.T) { Type: 1, }}} for _, cs := range ddlCases { - encoder := newEncoder() + encoder := newEncoder(&common.Config{}) for _, ddl := range cs { msg, err := encoder.EncodeDDLEvent(ddl) require.Nil(t, err) @@ -69,7 +70,7 @@ func TestMaxwellBatchCodec(t *testing.T) { } func TestMaxwellAppendRowChangedEventWithCallback(t *testing.T) { - encoder := newBatchEncoder() + encoder := newBatchEncoder(&common.Config{}) require.NotNil(t, encoder) count := 0 diff --git a/pkg/sink/codec/maxwell/maxwell_message.go b/pkg/sink/codec/maxwell/maxwell_message.go index fb9e8f1942a..6d2af9bcdef 100644 --- a/pkg/sink/codec/maxwell/maxwell_message.go +++ b/pkg/sink/codec/maxwell/maxwell_message.go @@ -43,7 +43,7 @@ func (m *maxwellMessage) encode() ([]byte, error) { return data, cerror.WrapError(cerror.ErrMaxwellEncodeFailed, err) } -func rowChangeToMaxwellMsg(e *model.RowChangedEvent) (*internal.MessageKey, *maxwellMessage) { +func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*internal.MessageKey, *maxwellMessage) { var partition *int64 if e.Table.IsPartition { partition = &e.Table.TableID @@ -68,6 +68,9 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent) (*internal.MessageKey, *max if e.IsDelete() { value.Type = "delete" for _, v := range e.PreColumns { + if onlyHandleKeyColumns && !v.Flag.IsHandleKey() { + continue + } switch v.Type { case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: if v.Value == nil { diff --git a/pkg/sink/codec/maxwell/maxwell_message_test.go b/pkg/sink/codec/maxwell/maxwell_message_test.go index 852886cc444..3928d3f36c4 100644 --- a/pkg/sink/codec/maxwell/maxwell_message_test.go +++ b/pkg/sink/codec/maxwell/maxwell_message_test.go @@ -54,7 +54,7 @@ func TestEncodeBinaryToMaxwell(t *testing.T) { Columns: []*model.Column{column}, } - key, msg := rowChangeToMaxwellMsg(e) + key, msg := rowChangeToMaxwellMsg(e, false) require.NotNil(t, key) require.NotNil(t, msg) } diff --git a/pkg/sink/codec/open/open_protocol_encoder.go b/pkg/sink/codec/open/open_protocol_encoder.go index 3c033a5bd94..012f78e0e45 100644 --- a/pkg/sink/codec/open/open_protocol_encoder.go +++ b/pkg/sink/codec/open/open_protocol_encoder.go @@ -34,10 +34,7 @@ type BatchEncoder struct { callbackBuff []func() curBatchSize int - // configs - MaxMessageBytes int - MaxBatchSize int - OnlyOutputUpdatedColumns bool + config *common.Config } // AppendRowChangedEvent implements the RowEventEncoder interface @@ -47,12 +44,12 @@ func (d *BatchEncoder) AppendRowChangedEvent( e *model.RowChangedEvent, callback func(), ) error { - keyMsg, valueMsg := rowChangeToMsg(e) + keyMsg, valueMsg := rowChangeToMsg(e, d.config.OnlyHandleKeyColumns) key, err := keyMsg.Encode() if err != nil { return errors.Trace(err) } - value, err := valueMsg.encode(d.OnlyOutputUpdatedColumns) + value, err := valueMsg.encode(d.config.OnlyOutputUpdatedColumns) if err != nil { return errors.Trace(err) } @@ -65,9 +62,9 @@ func (d *BatchEncoder) AppendRowChangedEvent( // for single message that is longer than max-message-bytes, do not send it. // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` length := len(key) + len(value) + common.MaxRecordOverhead + 16 + 8 - if length > d.MaxMessageBytes { + if length > d.config.MaxMessageBytes { log.Warn("Single message is too large for open-protocol", - zap.Int("maxMessageBytes", d.MaxMessageBytes), + zap.Int("maxMessageBytes", d.config.MaxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table), zap.Any("key", key), @@ -76,8 +73,8 @@ func (d *BatchEncoder) AppendRowChangedEvent( } if len(d.messageBuf) == 0 || - d.curBatchSize >= d.MaxBatchSize || - d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.MaxMessageBytes { + d.curBatchSize >= d.config.MaxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.config.MaxMessageBytes { // Before we create a new message, we should handle the previous callbacks. d.tryBuildCallback() versionHead := make([]byte, 8) @@ -193,12 +190,7 @@ type batchEncoderBuilder struct { // Build a BatchEncoder func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { - encoder := NewBatchEncoder() - encoder.(*BatchEncoder).MaxMessageBytes = b.config.MaxMessageBytes - encoder.(*BatchEncoder).MaxBatchSize = b.config.MaxBatchSize - encoder.(*BatchEncoder).OnlyOutputUpdatedColumns = b.config.OnlyOutputUpdatedColumns - - return encoder + return NewBatchEncoder(b.config) } // NewBatchEncoderBuilder creates an open-protocol batchEncoderBuilder. @@ -207,7 +199,8 @@ func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder } // NewBatchEncoder creates a new BatchEncoder. -func NewBatchEncoder() codec.RowEventEncoder { - batch := &BatchEncoder{} - return batch +func NewBatchEncoder(config *common.Config) codec.RowEventEncoder { + return &BatchEncoder{ + config: config, + } } diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index 809da7a633a..aba2a2af731 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -31,8 +31,7 @@ func TestBuildOpenProtocolBatchEncoder(t *testing.T) { builder := &batchEncoderBuilder{config: config} encoder, ok := builder.Build().(*BatchEncoder) require.True(t, ok) - require.Equal(t, config.MaxBatchSize, encoder.MaxBatchSize) - require.Equal(t, config.MaxMessageBytes, encoder.MaxMessageBytes) + require.NotNil(t, encoder.config) } func TestMaxMessageBytes(t *testing.T) { @@ -131,7 +130,6 @@ func TestOpenProtocolAppendRowChangedEventWithCallback(t *testing.T) { builder := &batchEncoderBuilder{config: cfg} encoder, ok := builder.Build().(*BatchEncoder) require.True(t, ok) - require.Equal(t, cfg.MaxBatchSize, encoder.MaxBatchSize) count := 0 diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index fb6e68f05e2..0c5f356bbde 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -94,7 +94,7 @@ func newResolvedMessage(ts uint64) *internal.MessageKey { } } -func rowChangeToMsg(e *model.RowChangedEvent) (*internal.MessageKey, *messageRow) { +func rowChangeToMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) (*internal.MessageKey, *messageRow) { var partition *int64 if e.Table.IsPartition { partition = &e.Table.TableID @@ -109,10 +109,10 @@ func rowChangeToMsg(e *model.RowChangedEvent) (*internal.MessageKey, *messageRow } value := &messageRow{} if e.IsDelete() { - value.Delete = rowChangeColumns2CodecColumns(e.PreColumns) + value.Delete = rowChangeColumns2CodecColumns(e.PreColumns, onlyHandleKeyColumns) } else { - value.Update = rowChangeColumns2CodecColumns(e.Columns) - value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns) + value.Update = rowChangeColumns2CodecColumns(e.Columns, false) + value.PreColumns = rowChangeColumns2CodecColumns(e.PreColumns, false) } return key, value } @@ -141,12 +141,15 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang return e } -func rowChangeColumns2CodecColumns(cols []*model.Column) map[string]internal.Column { +func rowChangeColumns2CodecColumns(cols []*model.Column, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { if col == nil { continue } + if onlyHandleKeyColumns && !col.Flag.IsHandleKey() { + continue + } c := internal.Column{} c.FromRowChangeColumn(col) jsonCols[col.Name] = c diff --git a/pkg/sink/codec/open/open_protocol_message_test.go b/pkg/sink/codec/open/open_protocol_message_test.go index be1cf2e4a6b..945bb629f1f 100644 --- a/pkg/sink/codec/open/open_protocol_message_test.go +++ b/pkg/sink/codec/open/open_protocol_message_test.go @@ -159,3 +159,60 @@ func TestOnlyOutputUpdatedColumn(t *testing.T) { assert.Equal(t, cs.output, ok) } } + +func TestRowChanged2MsgOnlyHandleKeyColumns(t *testing.T) { + t.Parallel() + + insertEvent := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "schema", + Table: "table", + }, + Columns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 1}, + }, + } + _, value := rowChangeToMsg(insertEvent, true) + _, ok := value.Update["a"] + require.True(t, ok) + + updateEvent := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "schema", + Table: "table", + }, + Columns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 2}, + }, + PreColumns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 1}, + }, + } + _, value = rowChangeToMsg(updateEvent, true) + _, ok = value.PreColumns["a"] + require.True(t, ok) + + deleteEvent := &model.RowChangedEvent{ + CommitTs: 417318403368288260, + Table: &model.TableName{ + Schema: "schema", + Table: "table", + }, + PreColumns: []*model.Column{ + {Name: "id", Flag: model.HandleKeyFlag, Type: mysql.TypeLonglong, Value: 1}, + {Name: "a", Type: mysql.TypeLonglong, Value: 2}, + }, + } + _, value = rowChangeToMsg(deleteEvent, true) + _, ok = value.Delete["a"] + require.False(t, ok) + + _, value = rowChangeToMsg(deleteEvent, false) + _, ok = value.Delete["a"] + require.True(t, ok) +} diff --git a/pkg/version/creator_version_gate.go b/pkg/version/creator_version_gate.go index 0480f65f47f..31ab5bea9a3 100644 --- a/pkg/version/creator_version_gate.go +++ b/pkg/version/creator_version_gate.go @@ -78,7 +78,10 @@ func (g *CreatorVersionGate) ChangefeedAcceptUnknownProtocols() bool { return creatorVersion.LessThan(changefeedAcceptUnknownProtocolsVersion) } -var changefeedAcceptProtocolInMysqlSinURI = *semver.New("6.1.1") +var ( + changefeedAcceptProtocolInMysqlSinURI = *semver.New("6.1.1") + changefeedAdjustEnableOldValueByProtocol = *semver.New("7.2.0") +) // ChangefeedAcceptProtocolInMysqlSinURI determines whether to accept // protocol in mysql sink uri or configure based on the creator's version. @@ -103,3 +106,14 @@ func (g *CreatorVersionGate) ChangefeedInheritSchedulerConfigFromV66() bool { creatorVersion := semver.New(SanitizeVersion(g.version)) return creatorVersion.Major == 6 && creatorVersion.Minor == 6 } + +// ChangefeedAdjustEnableOldValueByProtocol determines whether to adjust +// the `enable-old-value` configuration by the using encoding protocol. +func (g *CreatorVersionGate) ChangefeedAdjustEnableOldValueByProtocol() bool { + if g.version == "" { + return true + } + + creatorVersion := semver.New(SanitizeVersion(g.version)) + return creatorVersion.LessThan(changefeedAdjustEnableOldValueByProtocol) +} diff --git a/tests/integration_tests/multi_changefeed/run.sh b/tests/integration_tests/multi_changefeed/run.sh index f6850b5d594..cd453d3b051 100755 --- a/tests/integration_tests/multi_changefeed/run.sh +++ b/tests/integration_tests/multi_changefeed/run.sh @@ -38,17 +38,8 @@ function check_old_value_enabled() { # When old value is turned on, the pre-column in our delete will include all the columns. # So here we have 1 (id) and 3 (val). delete_with_old_value_count=$(grep "BlackHoleSink: WriteEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep 'columns\\\":null' | grep 'value\\\":1' | grep -c 'value\\\":3') - if [[ "$delete_with_old_value_count" -ne 1 ]]; then - echo "can't found delete row with old value" - exit 1 - fi - - # check if exist a delete row without a complete `pre-column` - # When old value is turned off, the pre-column in our delete will only include the handle columns. - # So here we only have 1 (id). - delete_without_old_value_count=$(grep "BlackHoleSink: WriteEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep 'columns\\\":null' | grep -c 'value\\\":1,\\\"default\\\":null},null') - if [[ "$delete_without_old_value_count" -ne 1 ]]; then - echo "can't found delete row without old value" + if [[ "$delete_with_old_value_count" -ne 2 ]]; then + echo "can't found delete row with old value, not 2 found" exit 1 fi }