Skip to content

Commit

Permalink
config(ticdc): force enable old value for canal json protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Dec 20, 2021
1 parent 5ecb465 commit 4aa46db
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
8 changes: 1 addition & 7 deletions pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ import (
"go.uber.org/zap"
)

// forceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var forceEnableOldValueProtocols = []string{
"canal",
"maxwell",
}

// changefeedCommonOptions defines common changefeed flags.
type changefeedCommonOptions struct {
noConfirm bool
Expand Down Expand Up @@ -208,7 +202,7 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co
if protocol != "" {
cfg.Sink.Protocol = protocol
}
for _, fp := range forceEnableOldValueProtocols {
for _, fp := range config.ForceEnableOldValueProtocols {
if cfg.Sink.Protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol))
cfg.EnableOldValue = true
Expand Down
21 changes: 14 additions & 7 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var ForceEnableOldValueProtocols = []string{
ProtocolCanal.String(),
ProtocolCanalJSON.String(),
ProtocolMaxwell.String(),
}

// SinkConfig represents sink config for a changefeed
type SinkConfig struct {
DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers"`
Expand All @@ -40,14 +47,14 @@ type ColumnSelector struct {
}

func (s *SinkConfig) validate(enableOldValue bool) error {
protocol := s.Protocol
if !enableOldValue {
switch protocol {
case ProtocolCanal.String(), ProtocolCanalJSON.String(), ProtocolMaxwell.String():
log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+
"Please update changefeed config", protocol))
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol)))
for _, protocolStr := range ForceEnableOldValueProtocols {
if protocolStr == s.Protocol {
log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+
"Please update changefeed config", s.Protocol))
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", s.Protocol)))
}
}
}

Expand Down

0 comments on commit 4aa46db

Please sign in to comment.