Skip to content

Commit

Permalink
sink/mysql: explicit_defaults_for_timestamp compatibility with mysql (#…
Browse files Browse the repository at this point in the history
…1638) #1658

Signed-off-by: ti-srebot <[email protected]>
Co-authored-by: amyangfei <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Apr 27, 2021
1 parent 75e0088 commit 8024268
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 7 deletions.
47 changes: 40 additions & 7 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const (
// SyncpointTableName is the name of table where all syncpoint maps sit
const syncpointTableName string = "syncpoint_v1"

const tidbVersionString string = "TiDB"

var validSchemes = map[string]bool{
"mysql": true,
"mysql+ssl": true,
Expand Down Expand Up @@ -306,21 +308,37 @@ var defaultParams = &sinkParams{
safeMode: defaultSafeMode,
}

func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) (string, error) {
func checkIsTiDB(ctx context.Context, db *sql.DB) (bool, error) {
var value string
querySQL := "select version();"
err := db.QueryRowContext(ctx, querySQL).Scan(&value)
if err != nil && err != sql.ErrNoRows {
return false, errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), "failed to select version")
}
return strings.Contains(value, tidbVersionString), nil
}

func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) (
sinkURIParameter string,
err error,
) {
var name string
var value string
querySQL := fmt.Sprintf("show session variables like '%s';", variableName)
err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value)
err = db.QueryRowContext(ctx, querySQL).Scan(&name, &value)
if err != nil && err != sql.ErrNoRows {
errMsg := "fail to query session variable " + variableName
return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg)
err = errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg)
return
}
// session variable works, use given default value
if err == nil {
return defaultValue, nil
// session variable exists, use given default value
sinkURIParameter = defaultValue
} else {
// session variable does not exist, sinkURIParameter is "" and will be ignored
err = nil
}
// session variable not exists, return "" to ignore it
return "", nil
return
}

func configureSinkURI(
Expand Down Expand Up @@ -359,6 +377,21 @@ func configureSinkURI(
dsnCfg.Params["tidb_txn_mode"] = txnMode
}

isTiDB, err := checkIsTiDB(ctx, testDB)
if err != nil {
return "", err
}
// variable `explicit_defaults_for_timestamp` is readonly in TiDB, we don't
// need to set it. Yet Default value in MySQL 5.7 is `OFF`
// ref: https://docs.pingcap.com/tidb/stable/mysql-compatibility#default-differences
if !isTiDB {
explicitTs, err := checkTiDBVariable(ctx, testDB, "explicit_defaults_for_timestamp", "ON")
if err != nil {
return "", err
}
dsnCfg.Params["explicit_defaults_for_timestamp"] = explicitTs
}

dsnClone := dsnCfg.Clone()
dsnClone.Passwd = "******"
log.Info("sink uri is configured", zap.String("format dsn", dsnClone.FormatDSN()))
Expand Down
52 changes: 52 additions & 0 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,29 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) {
c.Assert(err, check.IsNil)
defer db.Close()

dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/")
c.Assert(err, check.IsNil)
params := defaultParams.Clone()
dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db)
c.Assert(err, check.IsNil)
expectedParams := []string{
"tidb_txn_mode=optimistic",
"readTimeout=2m",
"writeTimeout=2m",
"allow_auto_random_explicit_insert=1",
"explicit_defaults_for_timestamp=ON",
}
for _, param := range expectedParams {
c.Assert(strings.Contains(dsnStr, param), check.IsTrue)
}
c.Assert(strings.Contains(dsnStr, "time_zone"), check.IsFalse)
}

testDefaultParamsTiDB := func() {
db, err := mockTestDBTiDB()
c.Assert(err, check.IsNil)
defer db.Close()

dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/")
c.Assert(err, check.IsNil)
params := defaultParams.Clone()
Expand Down Expand Up @@ -759,6 +782,7 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) {
}

testDefaultParams()
testDefaultParamsTiDB()
testTimezoneParam()
testTimeoutParams()
}
Expand Down Expand Up @@ -877,6 +901,34 @@ func mockTestDB() (*sql.DB, error) {
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"),
)
// Simulate the default value in MySQL5.7 is OFF
mock.ExpectQuery("select version\\(\\);").WillReturnRows(
sqlmock.NewRows([]string{"version"}).AddRow("5.7.32"),
)
// Simulate the default value in MySQL5.7 is OFF
mock.ExpectQuery("show session variables like 'explicit_defaults_for_timestamp';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("explicit_defaults_for_timestamp", "OFF"),
)
mock.ExpectClose()
return db, nil
}

func mockTestDBTiDB() (*sql.DB, error) {
// mock for test db, which is used querying TiDB session variable
db, mock, err := sqlmock.New()
if err != nil {
return nil, err
}
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "1"),
)
mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "optimistic"),
)
mock.ExpectQuery("select version\\(\\);").WillReturnRows(
sqlmock.NewRows([]string{"version"}).AddRow("5.7.25-TiDB-v5.0.0"),
)
mock.ExpectClose()
return db, nil
}
Expand Down

0 comments on commit 8024268

Please sign in to comment.