From 94b4b60354655839ba09917544cad5ac87d02fc0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 24 Jun 2021 11:49:22 +0800 Subject: [PATCH] cli: prevent invalid filter rules from being applied, which would lead to an irrecoverable changefeed failure (#2117) (#2132) --- cmd/client_changefeed.go | 6 +++--- cmd/cmd_test.go | 28 ++++++++++++++++++++++++++++ cmd/util.go | 10 ++++++++++ pkg/filter/filter.go | 15 +++++++++++++-- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index ebfe3326740..7f4552e3efc 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -279,7 +279,7 @@ func verifyChangefeedParameters(ctx context.Context, cmd *cobra.Command, isCreat log.Warn("The TiCDC cluster is built from 4.0-release branch, the old-value and unified-sorter are disabled by default.") } if len(configFile) > 0 { - if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil { + if err := verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil { return nil, err } } @@ -519,7 +519,7 @@ func newUpdateChangefeedCommand() *cobra.Command { info.SinkURI = sinkURI case "config": cfg := info.Config - if err = strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil { + if err = verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil { log.Error("decode config file error", zap.Error(err)) } case "opts": @@ -692,7 +692,7 @@ func newCreateChangefeedCyclicCommand() *cobra.Command { cfg := config.GetDefaultReplicaConfig() if len(configFile) > 0 { - if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil { + if err := verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil { return err } } diff --git a/cmd/cmd_test.go b/cmd/cmd_test.go index fab5dfa0eb2..aa48fe392e7 100644 --- a/cmd/cmd_test.go +++ b/cmd/cmd_test.go @@ -158,6 +158,34 @@ func (s *decodeFileSuite) TestShouldReturnErrForUnknownCfgs(c *check.C) { c.Assert(err, check.ErrorMatches, ".*unknown config.*") } +func (s *decodeFileSuite) TestVerifyReplicaConfig(c *check.C) { + defer testleak.AfterTest(c)() + + dir := c.MkDir() + path := filepath.Join(dir, "config.toml") + content := ` + [filter] + rules = ['*.*', '!test.*']` + err := ioutil.WriteFile(path, []byte(content), 0o644) + c.Assert(err, check.IsNil) + + cfg := config.GetDefaultReplicaConfig() + err = verifyReplicaConfig(path, "cdc", cfg) + c.Assert(err, check.IsNil) + + path = filepath.Join(dir, "config1.toml") + content = ` + [filter] + rules = ['*.*', '!test.*','rtest1']` + err = ioutil.WriteFile(path, []byte(content), 0o644) + c.Assert(err, check.IsNil) + + cfg = config.GetDefaultReplicaConfig() + err = verifyReplicaConfig(path, "cdc", cfg) + c.Assert(err, check.NotNil) + c.Assert(err, check.ErrorMatches, ".*CDC:ErrFilterRuleInvalid.*") +} + type mockPDClient struct { pd.Client ts uint64 diff --git a/cmd/util.go b/cmd/util.go index 63d7581d462..f8b1e952d3e 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -294,6 +294,16 @@ func verifySink( return nil } +// verifyReplicaConfig do strictDecodeFile check and only verify the rules for now +func verifyReplicaConfig(path, component string, cfg *config.ReplicaConfig) error { + err := strictDecodeFile(path, component, cfg) + if err != nil { + return err + } + _, err = filter.VerifyRules(cfg) + return err +} + // strictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped // into the Config struct, issue an error and stop the server from starting. func strictDecodeFile(path, component string, cfg interface{}) error { diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 8aa63973189..15cbc933f9b 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -30,8 +30,8 @@ type Filter struct { isCyclicEnabled bool } -// NewFilter creates a filter -func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) { +// VerifyRules ... +func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) { var f filterV2.Filter var err error if len(cfg.Filter.Rules) == 0 && cfg.Filter.MySQLReplicationRules != nil { @@ -46,6 +46,17 @@ func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) { if err != nil { return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err) } + + return f, nil +} + +// NewFilter creates a filter +func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) { + f, err := VerifyRules(cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err) + } + if !cfg.CaseSensitive { f = filterV2.CaseInsensitive(f) }