Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config(ticdc): Add alias for "dispatcher" in dispatch rules #5441

Merged
merged 12 commits into from
May 17, 2022
4 changes: 2 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func TestFillV1(t *testing.T) {
},
Sink: &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{Matcher: []string{"test.tbl3"}, PartitionRule: "ts"},
{Matcher: []string{"test.tbl4"}, PartitionRule: "rowid"},
{Matcher: []string{"test.tbl3"}, DispatcherRule: "ts"},
{Matcher: []string{"test.tbl4"}, DispatcherRule: "rowid"},
},
},
Cyclic: &config.CyclicConfig{
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/mysql/txn_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func (c *unresolvedTxnCache) Resolved(

func splitResolvedTxn(
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (checkpointTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
) (checkpointTsMap map[model.TableID]uint64,
resolvedRowsMap map[model.TableID][]*model.SingleTableTxn,
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
) {
var (
ok bool
txnsLength int
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/util/changefeed.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ worker-num = 16
# For MQ Sinks, you can configure event distribution rules through dispatchers
# Dispatchers support default, ts, rowid and table
dispatchers = [
{ matcher = ['test1.*', 'test2.*'], dispatcher = "ts", topic = "hello_{schema}" },
{ matcher = ['test1.*', 'test2.*'], partition = "ts", topic = "hello_{schema}" },
{ matcher = ['test3.*', 'test4.*'], dispatcher = "rowid", topic = "{schema}_world" },
]
# 对于 MQ 类的 Sink,可以通过 column-selectors 配置 column 选择器
Expand Down
149 changes: 79 additions & 70 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,17 @@ package util
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
)

func TestSuite(t *testing.T) { check.TestingT(t) }

type utilsSuite struct{}

var _ = check.Suite(&utilsSuite{})

func (s *utilsSuite) TestProxyFields(c *check.C) {
defer testleak.AfterTest(c)()
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
func TestProxyFields(t *testing.T) {
revIndex := map[string]int{
"http_proxy": 0,
"https_proxy": 1,
Expand All @@ -46,65 +39,74 @@ func (s *utilsSuite) TestProxyFields(c *check.C) {
// Each bit of the mask decided whether this index of `envs` would be set.
for mask := 0; mask <= 0b111; mask++ {
for _, env := range envs {
c.Assert(os.Unsetenv(env), check.IsNil)
require.Nil(t, os.Unsetenv(env))
}

for i := 0; i < 3; i++ {
if (1<<i)&mask != 0 {
c.Assert(os.Setenv(envs[i], envPreset[i]), check.IsNil)
require.Nil(t, os.Setenv(envs[i], envPreset[i]))
}
}

for _, field := range findProxyFields() {
idx, ok := revIndex[field.Key]
c.Assert(ok, check.IsTrue)
c.Assert((1<<idx)&mask, check.Not(check.Equals), 0)
c.Assert(field.String, check.Equals, envPreset[idx])
require.True(t, ok)
require.NotEqual(t, (1<<idx)&mask, 0)
require.Equal(t, field.String, envPreset[idx])
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (s *utilsSuite) TestVerifyPdEndpoint(c *check.C) {
defer testleak.AfterTest(c)()
func TestVerifyPdEndpoint(t *testing.T) {
// empty URL.
url := ""
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// invalid URL.
url = "\n hi"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*invalid control character in URL.*")
require.Regexp(t, ".*invalid control character in URL.*",
VerifyPdEndpoint(url, false))

// http URL without host.
url = "http://"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// https URL without host.
url = "https://"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// postgres scheme.
url = "postgres://postgres@localhost/cargo_registry"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*")
require.Regexp(t, ".*PD endpoint should be a valid http or https URL.*",
VerifyPdEndpoint(url, false))

// https scheme without TLS.
url = "https://aa"
c.Assert(VerifyPdEndpoint(url, false), check.ErrorMatches, ".*PD endpoint scheme is https, please provide certificate.*")
require.Regexp(t, ".*PD endpoint scheme is https, please provide certificate.*",
VerifyPdEndpoint(url, false))

// http scheme with TLS.
url = "http://aa"
c.Assert(VerifyPdEndpoint(url, true), check.ErrorMatches, ".*PD endpoint scheme should be https.*")
require.Regexp(t, ".*PD endpoint scheme should be https.*", VerifyPdEndpoint(url, true))

// valid http URL.
c.Assert(VerifyPdEndpoint("http://aa", false), check.IsNil)
require.Nil(t, VerifyPdEndpoint("http://aa", false))

// valid https URL with TLS.
c.Assert(VerifyPdEndpoint("https://aa", true), check.IsNil)
require.Nil(t, VerifyPdEndpoint("https://aa", true))
}

func (s *utilsSuite) TestStrictDecodeValidFile(c *check.C) {
defer testleak.AfterTest(c)()
dataDir := c.MkDir()
tmpDir := c.MkDir()
func TestStrictDecodeValidFile(t *testing.T) {
dataDir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", "tmp")
require.NoError(t, err)
defer os.RemoveAll(dataDir)
defer os.RemoveAll(tmpDir)

configPath := filepath.Join(tmpDir, "ticdc.toml")
configContent := fmt.Sprintf(`
addr = "128.0.0.1:1234"
Expand Down Expand Up @@ -140,18 +142,22 @@ cert-path = "bb"
key-path = "cc"
cert-allowed-cn = ["dd","ee"]
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
err = StrictDecodeFile(configPath, "test", conf)
c.Assert(err, check.IsNil)
require.Nil(t, err)
}

func (s *utilsSuite) TestStrictDecodeInvalidFile(c *check.C) {
defer testleak.AfterTest(c)()
dataDir := c.MkDir()
tmpDir := c.MkDir()
func TestStrictDecodeInvalidFile(t *testing.T) {
dataDir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", "tmp")
require.NoError(t, err)
defer os.RemoveAll(dataDir)
defer os.RemoveAll(tmpDir)

configPath := filepath.Join(tmpDir, "ticdc.toml")
configContent := fmt.Sprintf(`
unknown = "128.0.0.1:1234"
Expand All @@ -162,29 +168,30 @@ max-size = 200
max-days = 1
max-backups = 1
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
err = StrictDecodeFile(configPath, "test", conf)
c.Assert(err, check.ErrorMatches, ".*contained unknown configuration options.*")
require.Contains(t, err.Error(), "contained unknown configuration options")
}

func (s *utilsSuite) TestAndWriteExampleReplicaTOML(c *check.C) {
defer testleak.AfterTest(c)()
func TestAndWriteExampleReplicaTOML(t *testing.T) {
cfg := config.GetDefaultReplicaConfig()
err := StrictDecodeFile("changefeed.toml", "cdc", &cfg)
c.Assert(err, check.IsNil)
require.Nil(t, err)

c.Assert(cfg.CaseSensitive, check.IsTrue)
c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{
require.True(t, cfg.CaseSensitive)
require.Equal(t, &config.FilterConfig{
IgnoreTxnStartTs: []uint64{1, 2},
Rules: []string{"*.*", "!test.*"},
})
c.Assert(cfg.Mounter, check.DeepEquals, &config.MounterConfig{
}, cfg.Filter)
require.Equal(t, cfg.Mounter, &config.MounterConfig{
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
WorkerNum: 16,
})
c.Assert(cfg.Sink, check.DeepEquals, &config.SinkConfig{
err = cfg.Validate()
require.Nil(t, err)
require.Equal(t, &config.SinkConfig{
DispatchRules: []*config.DispatchRule{
{PartitionRule: "ts", TopicRule: "hello_{schema}", Matcher: []string{"test1.*", "test2.*"}},
{PartitionRule: "rowid", TopicRule: "{schema}_world", Matcher: []string{"test3.*", "test4.*"}},
Expand All @@ -194,28 +201,26 @@ func (s *utilsSuite) TestAndWriteExampleReplicaTOML(c *check.C) {
{Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}},
},
Protocol: "open-protocol",
})
c.Assert(cfg.Cyclic, check.DeepEquals, &config.CyclicConfig{
}, cfg.Sink)
require.Equal(t, &config.CyclicConfig{
Enable: false,
ReplicaID: 1,
FilterReplicaID: []uint64{2, 3},
SyncDDL: true,
})
}, cfg.Cyclic)
}

func (s *utilsSuite) TestAndWriteExampleServerTOML(c *check.C) {
defer testleak.AfterTest(c)()
func TestAndWriteExampleServerTOML(t *testing.T) {
cfg := config.GetDefaultServerConfig()
err := StrictDecodeFile("ticdc.toml", "cdc", &cfg)
c.Assert(err, check.IsNil)
require.Nil(t, err)
defcfg := config.GetDefaultServerConfig()
defcfg.AdvertiseAddr = "127.0.0.1:8300"
defcfg.LogFile = "/tmp/ticdc/ticdc.log"
c.Assert(cfg, check.DeepEquals, defcfg)
require.Equal(t, defcfg, cfg)
}

func (s *utilsSuite) TestJSONPrint(c *check.C) {
defer testleak.AfterTest(c)()
func TestJSONPrint(t *testing.T) {
cmd := new(cobra.Command)
type testStruct struct {
A string `json:"a"`
Expand All @@ -229,19 +234,23 @@ func (s *utilsSuite) TestJSONPrint(c *check.C) {
cmd.SetOut(&b)

err := JSONPrint(cmd, &data)
c.Assert(err, check.IsNil)
require.Nil(t, err)

output := `{
"a": "string"
}
`
c.Assert(b.String(), check.Equals, output)
require.Equal(t, output, b.String())
}

func (s *utilsSuite) TestIgnoreStrictCheckItem(c *check.C) {
defer testleak.AfterTest(c)()
dataDir := c.MkDir()
tmpDir := c.MkDir()
func TestIgnoreStrictCheckItem(t *testing.T) {
dataDir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", "tmp")
require.NoError(t, err)
defer os.RemoveAll(dataDir)
defer os.RemoveAll(tmpDir)

configPath := filepath.Join(tmpDir, "ticdc.toml")
configContent := fmt.Sprintf(`
data-dir = "%+v"
Expand All @@ -250,12 +259,12 @@ max-size = 200
max-days = 1
max-backups = 1
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
err = StrictDecodeFile(configPath, "test", conf, "unknown")
c.Assert(err, check.IsNil)
require.Nil(t, err)

configContent = fmt.Sprintf(`
data-dir = "%+v"
Expand All @@ -269,19 +278,19 @@ max-days = 1
max-backups = 1
`, dataDir)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
require.Nil(t, err)

err = StrictDecodeFile(configPath, "test", conf, "unknown")
c.Assert(err, check.ErrorMatches, ".*contained unknown configuration options: unknown2.*")
require.Contains(t, err.Error(), "contained unknown configuration options: unknown2")

configContent = fmt.Sprintf(`
data-dir = "%+v"
[debug]
unknown = 1
`, dataDir)
err = os.WriteFile(configPath, []byte(configContent), 0o644)
c.Assert(err, check.IsNil)
require.Nil(t, err)

err = StrictDecodeFile(configPath, "test", conf, "debug")
c.Assert(err, check.IsNil)
require.Nil(t, err)
}
4 changes: 2 additions & 2 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) {
}
for _, dispatch := range v1.Sink.DispatchRules {
c.Sink.DispatchRules = append(c.Sink.DispatchRules, &DispatchRule{
Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)},
PartitionRule: dispatch.Rule,
Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)},
DispatcherRule: dispatch.Rule,
})
}
}
Expand Down
30 changes: 26 additions & 4 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func TestReplicaConfigOutDated(t *testing.T) {
conf.Mounter.WorkerNum = 3
conf.Sink.Protocol = "open-protocol"
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, PartitionRule: "r1"},
{Matcher: []string{"a.c"}, PartitionRule: "r2"},
{Matcher: []string{"a.d"}, PartitionRule: "r2"},
{Matcher: []string{"a.b"}, DispatcherRule: "r1"},
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
{Matcher: []string{"a.d"}, DispatcherRule: "r2"},
}
require.Equal(t, conf, conf2)
}
Expand All @@ -93,5 +93,27 @@ func TestReplicaConfigValidate(t *testing.T) {
conf = GetDefaultReplicaConfig()
conf.Sink.Protocol = "canal"
conf.EnableOldValue = false
require.Regexp(t, ".*canal protocol requires old value to be enabled.*", conf.Validate())
require.Regexp(t, ".*canal protocol requires old value to be enabled.*",
conf.Validate())

conf = GetDefaultReplicaConfig()
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"},
}
require.Regexp(t, ".*dispatcher and partition cannot be configured both.*",
conf.Validate())

// Correct sink configuration.
conf = GetDefaultReplicaConfig()
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "d1"},
{Matcher: []string{"a.c"}, PartitionRule: "p1"},
{Matcher: []string{"a.d"}},
}
err := conf.Validate()
require.Nil(t, err)
rules := conf.Sink.DispatchRules
require.Equal(t, "d1", rules[0].PartitionRule)
require.Equal(t, "p1", rules[1].PartitionRule)
require.Equal(t, "", rules[2].PartitionRule)
}
Loading