Skip to content

Commit

Permalink
cmd,sink(ticdc): validate changefeed params and revise error message (#…
Browse files Browse the repository at this point in the history
…4482)

close #1716, close #1718, close #1719, close #4472
  • Loading branch information
overvenus authored Jan 26, 2022
1 parent c56c1c8 commit f917d36
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 20 deletions.
1 change: 1 addition & 0 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func newMySQLSink(
}

// create test db used for parameter detection
// Refer https://github.com/go-sql-driver/mysql#parameters
if dsn.Params == nil {
dsn.Params = make(map[string]string, 1)
}
Expand Down
44 changes: 42 additions & 2 deletions cdc/sink/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
// expose these two variables for redo log applier
DefaultWorkerCount = 16
DefaultMaxTxnRow = 256
// The upper limit of max worker counts.
maxWorkerCount = 1024
// The upper limit of max txn rows.
maxMaxTxnRow = 2048

defaultDMLMaxRetryTime = 8
defaultDDLMaxRetryTime = 20
Expand Down Expand Up @@ -113,16 +117,32 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c > 0 {
params.workerCount = c
if c <= 0 {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid worker-count %d, which must be greater than 0", c))
}
if c > maxWorkerCount {
log.Warn("worker-count too large",
zap.Int("original", c), zap.Int("override", maxWorkerCount))
c = maxWorkerCount
}
params.workerCount = c
}
s = sinkURI.Query().Get("max-txn-row")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c <= 0 {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid max-txn-row %d, which must be greater than 0", c))
}
if c > maxMaxTxnRow {
log.Warn("max-txn-row too large",
zap.Int("original", c), zap.Int("override", maxMaxTxnRow))
c = maxMaxTxnRow
}
params.maxTxnRow = c
}
s = sinkURI.Query().Get("tidb-txn-mode")
Expand Down Expand Up @@ -182,6 +202,14 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string
if s == "" {
params.timezone = ""
} else {
value, err := url.QueryUnescape(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
_, err = time.LoadLocation(value)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.timezone = fmt.Sprintf(`"%s"`, s)
}
} else {
Expand All @@ -195,14 +223,26 @@ func parseSinkURIToParams(ctx context.Context, sinkURI *url.URL, opts map[string
// To keep the same style with other sink parameters, we use dash as word separator.
s = sinkURI.Query().Get("read-timeout")
if s != "" {
_, err := time.ParseDuration(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.readTimeout = s
}
s = sinkURI.Query().Get("write-timeout")
if s != "" {
_, err := time.ParseDuration(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.writeTimeout = s
}
s = sinkURI.Query().Get("timeout")
if s != "" {
_, err := time.ParseDuration(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
params.dialTimeout = s
}

Expand Down
48 changes: 47 additions & 1 deletion cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,63 @@ func TestParseSinkURITimezone(t *testing.T) {
}
}

func TestParseSinkURIOverride(t *testing.T) {
defer testleak.AfterTestT(t)()
cases := []struct {
uri string
checker func(*sinkParams)
}{{
uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.workerCount, maxWorkerCount)
},
}, {
uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow)
},
}, {
uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode",
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}}
ctx := context.TODO()
opts := map[string]string{OptChangefeedID: "changefeed-01"}
var uri *url.URL
var err error
for _, cs := range cases {
if cs.uri != "" {
uri, err = url.Parse(cs.uri)
require.Nil(t, err)
} else {
uri = nil
}
p, err := parseSinkURIToParams(ctx, uri, opts)
require.Nil(t, err)
cs.checker(p)
}
}

func TestParseSinkURIBadQueryString(t *testing.T) {
defer testleak.AfterTestT(t)()
uris := []string{
"",
"postgre://127.0.0.1:3306",
"mysql://127.0.0.1:3306/?worker-count=not-number",
"mysql://127.0.0.1:3306/?worker-count=-1",
"mysql://127.0.0.1:3306/?worker-count=0",
"mysql://127.0.0.1:3306/?max-txn-row=not-number",
"mysql://127.0.0.1:3306/?max-txn-row=-1",
"mysql://127.0.0.1:3306/?max-txn-row=0",
"mysql://127.0.0.1:3306/?ssl-ca=only-ca-exists",
"mysql://127.0.0.1:3306/?batch-replace-enable=not-bool",
"mysql://127.0.0.1:3306/?batch-replace-enable=true&batch-replace-size=not-number",
"mysql://127.0.0.1:3306/?safe-mode=not-bool",
"mysql://127.0.0.1:3306/?time-zone=badtz",
"mysql://127.0.0.1:3306/?write-timeout=badduration",
"mysql://127.0.0.1:3306/?read-timeout=badduration",
"mysql://127.0.0.1:3306/?timeout=badduration",
}
ctx := context.TODO()
opts := map[string]string{OptChangefeedID: "changefeed-01"}
Expand All @@ -196,7 +242,7 @@ func TestParseSinkURIBadQueryString(t *testing.T) {
uri = nil
}
_, err = parseSinkURIToParams(ctx, uri, opts)
require.NotNil(t, err)
require.Error(t, err)
}
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/cmd/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,18 @@ func (o *createChangefeedOptions) complete(ctx context.Context, f factory.Factor
}
o.startTs = oracle.ComposeTS(ts, logical)
}

return o.completeCfg(ctx, cmd)
}

// completeCfg complete the replica config from file and cmd flags.
func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Command) error {
_, captureInfos, err := o.etcdClient.GetCaptures(ctx)
if err != nil {
return err
}

return o.completeCfg(cmd, captureInfos)
}

// completeCfg complete the replica config from file and cmd flags.
func (o *createChangefeedOptions) completeCfg(
cmd *cobra.Command, captureInfos []*model.CaptureInfo,
) error {
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -227,6 +228,16 @@ func (o *createChangefeedOptions) completeCfg(ctx context.Context, cmd *cobra.Co
}
}

switch o.commonChangefeedOptions.sortEngine {
case model.SortInMemory:
case model.SortInFile:
case model.SortUnified:
default:
log.Warn("invalid sort-engine, use Unified Sorter by default",
zap.String("invalidSortEngine", o.commonChangefeedOptions.sortEngine))
o.commonChangefeedOptions.sortEngine = model.SortUnified
}

if o.commonChangefeedOptions.sortEngine == model.SortUnified && !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() {
o.commonChangefeedOptions.sortEngine = model.SortInMemory
log.Warn("The TiCDC cluster is built from an older version, disabling Unified Sorter by default",
Expand Down
34 changes: 34 additions & 0 deletions pkg/cmd/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"testing"

"github.com/pingcap/check"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/pingcap/tiflow/pkg/version"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -65,3 +67,35 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) {
c.Assert(err, check.NotNil)
c.Assert(err, check.ErrorMatches, ".*CDC:ErrFilterRuleInvalid.*")
}

func (s *changefeedSuite) TestInvalidSortEngine(c *check.C) {
defer testleak.AfterTest(c)()

cases := []struct {
input string
expect model.SortEngine
}{{
input: "invalid",
expect: model.SortUnified,
}, {
input: "memory",
expect: model.SortInMemory,
}, {
input: "file",
expect: model.SortInFile,
}, {
input: "unified",
expect: model.SortUnified,
}}
for _, cs := range cases {
cmd := new(cobra.Command)
o := newChangefeedCommonOptions()
o.addFlags(cmd)
c.Assert(cmd.ParseFlags([]string{"--sort-engine=" + cs.input}), check.IsNil)
opt := newCreateChangefeedOptions(o)
err := opt.completeCfg(cmd,
[]*model.CaptureInfo{{Version: version.MinTiCDCVersion.String()}})
c.Assert(err, check.IsNil)
c.Assert(opt.commonChangefeedOptions.sortEngine, check.Equals, cs.expect)
}
}
9 changes: 6 additions & 3 deletions pkg/cmd/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
}
logConfig.Level = logLevel

pdEndpoints := strings.Split(f.GetPdAddr(), ",")
pdAddr := f.GetPdAddr()
pdEndpoints := strings.Split(pdAddr, ",")

etcdClient, err := clientv3.New(clientv3.Config{
Context: ctx,
Expand All @@ -118,7 +119,8 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
},
})
if err != nil {
return nil, err
return nil, errors.Annotatef(err,
"fail to open PD client, please check pd address \"%s\"", pdAddr)
}

client := etcd.NewCDCEtcdClient(ctx, etcdClient)
Expand Down Expand Up @@ -156,7 +158,8 @@ func (f factoryImpl) PdClient() (pd.Client, error) {
}),
))
if err != nil {
return nil, errors.Annotatef(err, "fail to open PD client, pd=\"%s\"", pdAddr)
return nil, errors.Annotatef(err,
"fail to open PD client, please check pd address \"%s\"", pdAddr)
}

err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true)
Expand Down
11 changes: 4 additions & 7 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,21 @@ import (

var (
// minPDVersion is the version of the minimal compatible PD.
// TODO bump 5.2.0-alpha once PD releases.
minPDVersion *semver.Version = semver.New("5.1.0-alpha")
// maxPDVersion is the version of the maximum compatible PD.
// Compatible versions are in [minPDVersion, maxPDVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxPDVersion *semver.Version = semver.New("9999.0.0")

// MinTiKVVersion is the version of the minimal compatible TiKV.
// TODO bump 5.2.0-alpha once TiKV releases.
MinTiKVVersion *semver.Version = semver.New("5.1.0-alpha")
// maxTiKVVersion is the version of the maximum compatible TiKV.
// Compatible versions are in [MinTiKVVersion, maxTiKVVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxTiKVVersion *semver.Version = semver.New("9999.0.0")

// minTiCDCVersion is the version of the minimal compatible TiCDC.
// TODO bump 5.2.0-alpha once TiCDC releases.
minTiCDCVersion *semver.Version = semver.New("5.1.0-alpha")
// MinTiCDCVersion is the version of the minimal compatible TiCDC.
MinTiCDCVersion *semver.Version = semver.New("5.1.0-alpha")
// Compatible versions are in [MinTiCDCVersion, MaxTiCDCVersion)
// 9999.0.0 disables the check effectively in the master branch.
maxTiCDCVersion *semver.Version = semver.New("9999.0.0")
Expand Down Expand Up @@ -266,11 +263,11 @@ func CheckTiCDCClusterVersion(cdcClusterVer TiCDCClusterVersion) (unknown bool,
return true, nil
}
ver := cdcClusterVer.Version
minOrd := ver.Compare(*minTiCDCVersion)
minOrd := ver.Compare(*MinTiCDCVersion)
if minOrd < 0 {
arg := fmt.Sprintf("TiCDC %s is not supported, the minimal compatible version is %s"+
"try tiup ctl:%s cdc [COMMAND]",
ver, minTiCDCVersion, ver)
ver, MinTiCDCVersion, ver)
return false, cerror.ErrVersionIncompatible.GenWithStackByArgs(arg)
}
maxOrd := ver.Compare(*maxTiCDCVersion)
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestCheckTiCDCClusterVersion(t *testing.T) {
expectedUnknown: true,
},
{
cdcClusterVersion: TiCDCClusterVersion{Version: minTiCDCVersion},
cdcClusterVersion: TiCDCClusterVersion{Version: MinTiCDCVersion},
expectedErr: "",
expectedUnknown: false,
},
Expand Down

0 comments on commit f917d36

Please sign in to comment.