Skip to content

Commit

Permalink
sink(ticdc): adjust sql mode compatibility for mysql sink (#3938) (#4197
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ti-chi-bot authored Jan 4, 2022
1 parent 1ba7870 commit 1812a71
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 56 deletions.
64 changes: 31 additions & 33 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/common"
dmutils "github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
Expand Down Expand Up @@ -130,6 +131,30 @@ func newMySQLSink(
}
defer testDB.Close()

// Adjust sql_mode for compatibility.
dsn.Params["sql_mode"], err = querySQLMode(ctx, testDB)
if err != nil {
return nil, errors.Trace(err)
}
dsn.Params["sql_mode"], err = dmutils.AdjustSQLModeCompatible(dsn.Params["sql_mode"])
if err != nil {
return nil, errors.Trace(err)
}

// Adjust sql_mode for cyclic replication.
var sinkCyclic *cyclic.Cyclic = nil
if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sinkCyclic = cyclic.NewCyclic(cfg)
dsn.Params["sql_mode"] = cyclic.RelaxSQLMode(dsn.Params["sql_mode"])
}
// NOTE: quote the string is necessary to avoid ambiguities.
dsn.Params["sql_mode"] = strconv.Quote(dsn.Params["sql_mode"])

dsnStr, err = generateDSNByParams(ctx, dsn, params, testDB)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -157,6 +182,7 @@ func newMySQLSink(
db: db,
params: params,
filter: filter,
cyclic: sinkCyclic,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
Expand All @@ -166,25 +192,10 @@ func newMySQLSink(
cancel: cancel,
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
cfg := new(config.CyclicConfig)
err := cfg.Unmarshal([]byte(val))
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
sink.cyclic = cyclic.NewCyclic(cfg)

err = sink.adjustSQLMode(ctx)
if err != nil {
return nil, errors.Trace(err)
}
}

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)

err = sink.createSinkWorkers(ctx)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -348,26 +359,13 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}

// adjustSQLMode adjust sql mode according to sink config.
func (s *mysqlSink) adjustSQLMode(ctx context.Context) error {
// Must relax sql mode to support cyclic replication, as downstream may have
// extra columns (not null and no default value).
if s.cyclic == nil || !s.cyclic.Enabled() {
return nil
}
var oldMode, newMode string
row := s.db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err := row.Scan(&oldMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
}

newMode = cyclic.RelaxSQLMode(oldMode)
_, err = s.db.ExecContext(ctx, fmt.Sprintf("SET sql_mode = '%s';", newMode))
func querySQLMode(ctx context.Context, db *sql.DB) (sqlMode string, err error) {
row := db.QueryRowContext(ctx, "SELECT @@SESSION.sql_mode;")
err = row.Scan(&sqlMode)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLQueryError, err)
err = cerror.WrapError(cerror.ErrMySQLQueryError, err)
}
return nil
return
}

func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestGenerateDSNByParams(t *testing.T) {
defer testleak.AfterTestT(t)()

testDefaultParams := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand All @@ -84,7 +84,7 @@ func TestGenerateDSNByParams(t *testing.T) {
}

testTimezoneParam := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand All @@ -98,7 +98,7 @@ func TestGenerateDSNByParams(t *testing.T) {
}

testTimeoutParams := func() {
db, err := mockTestDB()
db, err := mockTestDB(false)
require.Nil(t, err)
defer db.Close()

Expand Down
30 changes: 15 additions & 15 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,17 @@ func TestReduceReplace(t *testing.T) {
}
}

func mockTestDB() (*sql.DB, error) {
func mockTestDB(adjustSQLMode bool) (*sql.DB, error) {
// mock for test db, which is used querying TiDB session variable
db, mock, err := sqlmock.New()
if err != nil {
return nil, err
}
if adjustSQLMode {
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
}
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
Expand All @@ -455,18 +460,13 @@ func TestAdjustSQLMode(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
mock.ExpectExec("SET sql_mode = 'ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE';").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectClose()
return db, nil
}
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -716,7 +716,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -782,7 +782,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestNewMySQLSinkExecDDL(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1027,7 +1027,7 @@ func TestNewMySQLSink(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func TestMySQLSinkClose(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) {
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB()
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestApplyDMLs(t *testing.T) {
if err != nil {
return nil, err
}
mock.ExpectQuery("SELECT @@SESSION.sql_mode;").
WillReturnRows(sqlmock.NewRows([]string{"@@SESSION.sql_mode"}).
AddRow("ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE"))
columns := []string{"Variable_name", "Value"}
mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows(
sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"),
Expand Down
11 changes: 6 additions & 5 deletions tests/mq_protocol_tests/framework/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package framework
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

_ "github.com/go-sql-driver/mysql" // imported for side effects
Expand Down Expand Up @@ -82,20 +84,19 @@ func (c *TaskContext) SQLHelper() *SQLHelper {
func (p *CDCProfile) String() string {
builder := strings.Builder{}
builder.WriteString("cli changefeed create ")

if p.PDUri == "" {
p.PDUri = "http://127.0.0.1:2379"
}

builder.WriteString("--pd=" + p.PDUri + " ")
builder.WriteString(fmt.Sprintf("--pd=%s ", strconv.Quote(p.PDUri)))

if p.SinkURI == "" {
log.Fatal("SinkURI cannot be empty!")
}

builder.WriteString("--sink-uri=\"" + p.SinkURI + "\" ")
builder.WriteString(fmt.Sprintf("--sink-uri=%s ", strconv.Quote(p.SinkURI)))

if p.ConfigFile != "" {
builder.WriteString("--config=" + p.ConfigFile + " ")
builder.WriteString(fmt.Sprintf("--config=%s ", strconv.Quote(p.ConfigFile)))
}

if p.Opts == nil || len(p.Opts) == 0 {
Expand Down

0 comments on commit 1812a71

Please sign in to comment.