Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: prevent invalid filter rules from being applied, which would lead to an irrecoverable changefeed failure (#2117) #2132

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func verifyChangefeedParameters(ctx context.Context, cmd *cobra.Command, isCreat
log.Warn("The TiCDC cluster is built from 4.0-release branch, the old-value and unified-sorter are disabled by default.")
}
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
if err := verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func newUpdateChangefeedCommand() *cobra.Command {
info.SinkURI = sinkURI
case "config":
cfg := info.Config
if err = strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
if err = verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil {
log.Error("decode config file error", zap.Error(err))
}
case "opts":
Expand Down Expand Up @@ -692,7 +692,7 @@ func newCreateChangefeedCyclicCommand() *cobra.Command {

cfg := config.GetDefaultReplicaConfig()
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
if err := verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil {
return err
}
}
Expand Down
28 changes: 28 additions & 0 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ func (s *decodeFileSuite) TestShouldReturnErrForUnknownCfgs(c *check.C) {
c.Assert(err, check.ErrorMatches, ".*unknown config.*")
}

func (s *decodeFileSuite) TestVerifyReplicaConfig(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
content := `
[filter]
rules = ['*.*', '!test.*']`
err := ioutil.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)

cfg := config.GetDefaultReplicaConfig()
err = verifyReplicaConfig(path, "cdc", cfg)
c.Assert(err, check.IsNil)

path = filepath.Join(dir, "config1.toml")
content = `
[filter]
rules = ['*.*', '!test.*','rtest1']`
err = ioutil.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)

cfg = config.GetDefaultReplicaConfig()
err = verifyReplicaConfig(path, "cdc", cfg)
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, ".*CDC:ErrFilterRuleInvalid.*")
}

type mockPDClient struct {
pd.Client
ts uint64
Expand Down
10 changes: 10 additions & 0 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ func verifySink(
return nil
}

// verifyReplicaConfig do strictDecodeFile check and only verify the rules for now
func verifyReplicaConfig(path, component string, cfg *config.ReplicaConfig) error {
err := strictDecodeFile(path, component, cfg)
if err != nil {
return err
}
_, err = filter.VerifyRules(cfg)
return err
}

// strictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped
// into the Config struct, issue an error and stop the server from starting.
func strictDecodeFile(path, component string, cfg interface{}) error {
Expand Down
15 changes: 13 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Filter struct {
isCyclicEnabled bool
}

// NewFilter creates a filter
func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
// VerifyRules ...
func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) {
var f filterV2.Filter
var err error
if len(cfg.Filter.Rules) == 0 && cfg.Filter.MySQLReplicationRules != nil {
Expand All @@ -46,6 +46,17 @@ func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err)
}

return f, nil
}

// NewFilter creates a filter
func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
f, err := VerifyRules(cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err)
}

if !cfg.CaseSensitive {
f = filterV2.CaseInsensitive(f)
}
Expand Down