diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 10008810de7..5e542d0983c 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -69,6 +69,8 @@ const ( // SyncpointTableName is the name of table where all syncpoint maps sit const syncpointTableName string = "syncpoint_v1" +const tidbVersionString string = "TiDB" + var validSchemes = map[string]bool{ "mysql": true, "mysql+ssl": true, @@ -306,21 +308,37 @@ var defaultParams = &sinkParams{ safeMode: defaultSafeMode, } -func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) (string, error) { +func checkIsTiDB(ctx context.Context, db *sql.DB) (bool, error) { + var value string + querySQL := "select version();" + err := db.QueryRowContext(ctx, querySQL).Scan(&value) + if err != nil && err != sql.ErrNoRows { + return false, errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), "failed to select version") + } + return strings.Contains(value, tidbVersionString), nil +} + +func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) ( + sinkURIParameter string, + err error, +) { var name string var value string querySQL := fmt.Sprintf("show session variables like '%s';", variableName) - err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value) + err = db.QueryRowContext(ctx, querySQL).Scan(&name, &value) if err != nil && err != sql.ErrNoRows { errMsg := "fail to query session variable " + variableName - return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) + err = errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) + return } - // session variable works, use given default value if err == nil { - return defaultValue, nil + // session variable exists, use given default value + sinkURIParameter = defaultValue + } else { + // session variable does not exist, sinkURIParameter is "" and will be ignored + err = nil } - // session variable not exists, return "" to ignore it - return "", nil + return } func configureSinkURI( @@ -359,6 +377,21 @@ func configureSinkURI( dsnCfg.Params["tidb_txn_mode"] = txnMode } + isTiDB, err := checkIsTiDB(ctx, testDB) + if err != nil { + return "", err + } + // variable `explicit_defaults_for_timestamp` is readonly in TiDB, we don't + // need to set it. Yet Default value in MySQL 5.7 is `OFF` + // ref: https://docs.pingcap.com/tidb/stable/mysql-compatibility#default-differences + if !isTiDB { + explicitTs, err := checkTiDBVariable(ctx, testDB, "explicit_defaults_for_timestamp", "ON") + if err != nil { + return "", err + } + dsnCfg.Params["explicit_defaults_for_timestamp"] = explicitTs + } + dsnClone := dsnCfg.Clone() dsnClone.Passwd = "******" log.Info("sink uri is configured", zap.String("format dsn", dsnClone.FormatDSN())) diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 37f58b0e69c..cb226b03b3a 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -704,6 +704,29 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { c.Assert(err, check.IsNil) defer db.Close() + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") + c.Assert(err, check.IsNil) + params := defaultParams.Clone() + dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) + c.Assert(err, check.IsNil) + expectedParams := []string{ + "tidb_txn_mode=optimistic", + "readTimeout=2m", + "writeTimeout=2m", + "allow_auto_random_explicit_insert=1", + "explicit_defaults_for_timestamp=ON", + } + for _, param := range expectedParams { + c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + } + c.Assert(strings.Contains(dsnStr, "time_zone"), check.IsFalse) + } + + testDefaultParamsTiDB := func() { + db, err := mockTestDBTiDB() + c.Assert(err, check.IsNil) + defer db.Close() + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") c.Assert(err, check.IsNil) params := defaultParams.Clone() @@ -759,6 +782,7 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { } testDefaultParams() + testDefaultParamsTiDB() testTimezoneParam() testTimeoutParams() } @@ -877,6 +901,34 @@ func mockTestDB() (*sql.DB, error) { mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), ) + // Simulate the default value in MySQL5.7 is OFF + mock.ExpectQuery("select version\\(\\);").WillReturnRows( + sqlmock.NewRows([]string{"version"}).AddRow("5.7.32"), + ) + // Simulate the default value in MySQL5.7 is OFF + mock.ExpectQuery("show session variables like 'explicit_defaults_for_timestamp';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("explicit_defaults_for_timestamp", "OFF"), + ) + mock.ExpectClose() + return db, nil +} + +func mockTestDBTiDB() (*sql.DB, error) { + // mock for test db, which is used querying TiDB session variable + db, mock, err := sqlmock.New() + if err != nil { + return nil, err + } + columns := []string{"Variable_name", "Value"} + mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "1"), + ) + mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( + sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "optimistic"), + ) + mock.ExpectQuery("select version\\(\\);").WillReturnRows( + sqlmock.NewRows([]string{"version"}).AddRow("5.7.25-TiDB-v5.0.0"), + ) mock.ExpectClose() return db, nil }