diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index d1d028f92b1..5ffe27767c1 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -192,6 +192,26 @@ func (p *Protocol) FromString(protocol string) { } } +// String converts the Protocol enum type string to string. +func (p Protocol) String() string { + switch p { + case ProtocolDefault: + return "default" + case ProtocolCanal: + return "canal" + case ProtocolAvro: + return "avro" + case ProtocolMaxwell: + return "maxwell" + case ProtocolCanalJSON: + return "canal-json" + case ProtocolCraft: + return "craft" + default: + panic("unreachable") + } +} + // NewEventBatchEncoder returns a function of creating an EventBatchEncoder by protocol. func NewEventBatchEncoder(p Protocol) func() EventBatchEncoder { switch p { diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index d3142590758..83b09b5ff88 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -15,6 +15,7 @@ package sink import ( "context" + "fmt" "net/url" "strings" "sync/atomic" @@ -107,9 +108,10 @@ func newMqSink( avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx)) return avroEncoder } - } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue { - log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config") - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) + } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue { + log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ + "Please update changefeed config", protocol.String())) + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String()))) } // pre-flight verification of encoder parameters diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index 1e492a9a060..d88af5cce33 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -46,6 +46,7 @@ import ( // forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var forceEnableOldValueProtocols = []string{ "canal", + "canal-json", "maxwell", } @@ -205,9 +206,12 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co } protocol := sinkURIParsed.Query().Get("protocol") + if protocol != "" { + cfg.Sink.Protocol = protocol + } for _, fp := range forceEnableOldValueProtocols { - if protocol == fp { - log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) + if cfg.Sink.Protocol == fp { + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) cfg.EnableOldValue = true break }