Skip to content

Commit

Permalink
cli: prevent invalid filter rules from being applied, which would lea…
Browse files Browse the repository at this point in the history
…d to an irrecoverable changefeed failure (pingcap#2117) (pingcap#2132)
  • Loading branch information
ti-chi-bot authored and liuzix committed Jul 6, 2021
1 parent 8d4e048 commit 94b4b60
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
6 changes: 3 additions & 3 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func verifyChangefeedParameters(ctx context.Context, cmd *cobra.Command, isCreat
log.Warn("The TiCDC cluster is built from 4.0-release branch, the old-value and unified-sorter are disabled by default.")
}
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
if err := verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func newUpdateChangefeedCommand() *cobra.Command {
info.SinkURI = sinkURI
case "config":
cfg := info.Config
if err = strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
if err = verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil {
log.Error("decode config file error", zap.Error(err))
}
case "opts":
Expand Down Expand Up @@ -692,7 +692,7 @@ func newCreateChangefeedCyclicCommand() *cobra.Command {

cfg := config.GetDefaultReplicaConfig()
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "TiCDC changefeed", cfg); err != nil {
if err := verifyReplicaConfig(configFile, "TiCDC changefeed", cfg); err != nil {
return err
}
}
Expand Down
28 changes: 28 additions & 0 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ func (s *decodeFileSuite) TestShouldReturnErrForUnknownCfgs(c *check.C) {
c.Assert(err, check.ErrorMatches, ".*unknown config.*")
}

func (s *decodeFileSuite) TestVerifyReplicaConfig(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
content := `
[filter]
rules = ['*.*', '!test.*']`
err := ioutil.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)

cfg := config.GetDefaultReplicaConfig()
err = verifyReplicaConfig(path, "cdc", cfg)
c.Assert(err, check.IsNil)

path = filepath.Join(dir, "config1.toml")
content = `
[filter]
rules = ['*.*', '!test.*','rtest1']`
err = ioutil.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)

cfg = config.GetDefaultReplicaConfig()
err = verifyReplicaConfig(path, "cdc", cfg)
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, ".*CDC:ErrFilterRuleInvalid.*")
}

type mockPDClient struct {
pd.Client
ts uint64
Expand Down
10 changes: 10 additions & 0 deletions cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ func verifySink(
return nil
}

// verifyReplicaConfig do strictDecodeFile check and only verify the rules for now
func verifyReplicaConfig(path, component string, cfg *config.ReplicaConfig) error {
err := strictDecodeFile(path, component, cfg)
if err != nil {
return err
}
_, err = filter.VerifyRules(cfg)
return err
}

// strictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped
// into the Config struct, issue an error and stop the server from starting.
func strictDecodeFile(path, component string, cfg interface{}) error {
Expand Down
15 changes: 13 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type Filter struct {
isCyclicEnabled bool
}

// NewFilter creates a filter
func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
// VerifyRules ...
func VerifyRules(cfg *config.ReplicaConfig) (filterV2.Filter, error) {
var f filterV2.Filter
var err error
if len(cfg.Filter.Rules) == 0 && cfg.Filter.MySQLReplicationRules != nil {
Expand All @@ -46,6 +46,17 @@ func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err)
}

return f, nil
}

// NewFilter creates a filter
func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
f, err := VerifyRules(cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrFilterRuleInvalid, err)
}

if !cfg.CaseSensitive {
f = filterV2.CaseInsensitive(f)
}
Expand Down

0 comments on commit 94b4b60

Please sign in to comment.