diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index 2028b8dcf6f..9f8667026f0 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -43,12 +43,6 @@ import ( "go.uber.org/zap" ) -// forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. -var forceEnableOldValueProtocols = []string{ - "canal", - "maxwell", -} - // changefeedCommonOptions defines common changefeed flags. type changefeedCommonOptions struct { noConfirm bool @@ -208,7 +202,7 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co if protocol != "" { cfg.Sink.Protocol = protocol } - for _, fp := range forceEnableOldValueProtocols { + for _, fp := range config.ForceEnableOldValueProtocols { if cfg.Sink.Protocol == fp { log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) cfg.EnableOldValue = true diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 272d252d736..1bf0338237c 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -21,6 +21,13 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) +// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. +var ForceEnableOldValueProtocols = []string{ + ProtocolCanal.String(), + ProtocolCanalJSON.String(), + ProtocolMaxwell.String(), +} + // SinkConfig represents sink config for a changefeed type SinkConfig struct { DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"` @@ -40,14 +47,14 @@ type ColumnSelector struct { } func (s *SinkConfig) validate(enableOldValue bool) error { - protocol := s.Protocol if !enableOldValue { - switch protocol { - case ProtocolCanal.String(), ProtocolCanalJSON.String(), ProtocolMaxwell.String(): - log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ - "Please update changefeed config", protocol)) - return cerror.WrapError(cerror.ErrKafkaInvalidConfig, - errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol))) + for _, protocolStr := range ForceEnableOldValueProtocols { + if protocolStr == s.Protocol { + log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ + "Please update changefeed config", s.Protocol)) + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, + errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", s.Protocol))) + } } }