From e7b5a8e1c984701f892242d897df67501c0b00c7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 24 Dec 2021 15:51:47 +0800 Subject: [PATCH] config(ticdc): Fix old value configuration check for maxwell protocol (#3747) (#3781) --- cdc/sink/codec/interface.go | 18 ++++++++++++++++++ cdc/sink/mq.go | 8 +++++--- cmd/client_changefeed.go | 8 ++++++-- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 6769f943c93..f58a91d39f7 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -172,6 +172,24 @@ func (p *Protocol) FromString(protocol string) { } } +// String converts the Protocol enum type string to string. +func (p Protocol) String() string { + switch p { + case ProtocolDefault: + return "default" + case ProtocolCanal: + return "canal" + case ProtocolAvro: + return "avro" + case ProtocolMaxwell: + return "maxwell" + case ProtocolCanalJSON: + return "canal-json" + default: + panic("unreachable") + } +} + // NewEventBatchEncoder returns a function of creating an EventBatchEncoder by protocol. func NewEventBatchEncoder(p Protocol) func() EventBatchEncoder { switch p { diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 9b9023f6a17..6edc098c829 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -15,6 +15,7 @@ package sink import ( "context" + "fmt" "net/url" "strings" "sync/atomic" @@ -107,9 +108,10 @@ func newMqSink( avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx)) return avroEncoder } - } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue { - log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config") - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled")) + } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue { + log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+ + "Please update changefeed config", protocol.String())) + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String()))) } // pre-flight verification of encoder parameters diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index ced0057cf11..83096636c26 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -43,6 +43,7 @@ import ( var forceEnableOldValueProtocols = []string{ "canal", + "canal-json", "maxwell", } @@ -321,9 +322,12 @@ func verifyChangefeedParameters( } protocol := sinkURIParsed.Query().Get("protocol") + if protocol != "" { + cfg.Sink.Protocol = protocol + } for _, fp := range forceEnableOldValueProtocols { - if protocol == fp { - log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol)) + if cfg.Sink.Protocol == fp { + log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol)) cfg.EnableOldValue = true break }