diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index bb245ee6932..6cf22b25a60 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -62,6 +62,7 @@ const ( defaultBatchReplaceSize = 20 defaultReadTimeout = "2m" defaultWriteTimeout = "2m" + defaultDialTimeout = "2m" defaultSafeMode = true ) @@ -286,6 +287,7 @@ type sinkParams struct { batchReplaceSize int readTimeout string writeTimeout string + dialTimeout string enableOldValue bool safeMode bool timezone string @@ -305,6 +307,7 @@ var defaultParams = &sinkParams{ batchReplaceSize: defaultBatchReplaceSize, readTimeout: defaultReadTimeout, writeTimeout: defaultWriteTimeout, + dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, } @@ -343,6 +346,7 @@ func configureSinkURI( } dsnCfg.Params["readTimeout"] = params.readTimeout dsnCfg.Params["writeTimeout"] = params.writeTimeout + dsnCfg.Params["timeout"] = params.dialTimeout autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1") if err != nil { @@ -467,6 +471,23 @@ func parseSinkURI(ctx context.Context, sinkURI *url.URL, opts map[string]string) params.timezone = fmt.Sprintf(`"%s"`, tz.String()) } + // read, write, and dial timeout for each individual connection, equals to + // readTimeout, writeTimeout, timeout in go mysql driver respectively. + // ref: https://github.com/go-sql-driver/mysql#connection-pool-and-timeouts + // To keep the same style with other sink parameters, we use dash as word separator. + s = sinkURI.Query().Get("read-timeout") + if s != "" { + params.readTimeout = s + } + s = sinkURI.Query().Get("write-timeout") + if s != "" { + params.writeTimeout = s + } + s = sinkURI.Query().Get("timeout") + if s != "" { + params.dialTimeout = s + } + return params, nil } @@ -480,6 +501,10 @@ func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) { } err = db.PingContext(ctx) if err != nil { + // close db to recycle resources + if closeErr := db.Close(); closeErr != nil { + log.Warn("close db failed", zap.Error(err)) + } return nil, errors.Annotate( cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") } @@ -528,6 +553,9 @@ func newMySQLSink( if params.timezone != "" { dsn.Params["time_zone"] = params.timezone } + dsn.Params["readTimeout"] = params.readTimeout + dsn.Params["writeTimeout"] = params.writeTimeout + dsn.Params["timeout"] = params.dialTimeout testDB, err := getDBConnImpl(ctx, dsn.FormatDSN()) if err != nil { return nil, err @@ -815,12 +843,18 @@ func (s *mysqlSink) execDMLWithMaxRetries( args := dmls.values[i] log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) if _, err := tx.ExecContext(ctx, query, args...); err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Warn("failed to rollback txn", zap.Error(err)) + } return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) } } if len(dmls.markSQL) != 0 { log.Debug("exec row", zap.String("sql", dmls.markSQL)) if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Warn("failed to rollback txn", zap.Error(err)) + } return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) } } diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index e8f821d8b12..f7c10831609 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -16,10 +16,13 @@ package sink import ( "context" "database/sql" + "database/sql/driver" "fmt" + "net" "net/url" "sort" "strings" + "sync" "testing" "time" @@ -811,6 +814,7 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { batchReplaceSize: defaultBatchReplaceSize, readTimeout: defaultReadTimeout, writeTimeout: defaultWriteTimeout, + dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, }) c.Assert(param2, check.DeepEquals, &sinkParams{ @@ -822,49 +826,76 @@ func (s MySQLSinkSuite) TestSinkParamsClone(c *check.C) { batchReplaceSize: defaultBatchReplaceSize, readTimeout: defaultReadTimeout, writeTimeout: defaultWriteTimeout, + dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, }) } func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { defer testleak.AfterTest(c)() - db, mock, err := sqlmock.New() - c.Assert(err, check.IsNil) - defer db.Close() //nolint:errcheck - columns := []string{"Variable_name", "Value"} - mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), - ) - mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), - ) - mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "0"), - ) - mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), - ) - dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") - c.Assert(err, check.IsNil) - params := defaultParams.Clone() - dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) - c.Assert(err, check.IsNil) - expectedParams := []string{ - "tidb_txn_mode=optimistic", - "readTimeout=2m", - "writeTimeout=2m", - "allow_auto_random_explicit_insert=1", + testDefaultParams := func() { + db, err := mockTestDB() + c.Assert(err, check.IsNil) + defer db.Close() + + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") + c.Assert(err, check.IsNil) + params := defaultParams.Clone() + dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) + c.Assert(err, check.IsNil) + expectedParams := []string{ + "tidb_txn_mode=optimistic", + "readTimeout=2m", + "writeTimeout=2m", + "allow_auto_random_explicit_insert=1", + } + for _, param := range expectedParams { + c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + } + c.Assert(strings.Contains(dsnStr, "time_zone"), check.IsFalse) } - for _, param := range expectedParams { - c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + + testTimezoneParam := func() { + db, err := mockTestDB() + c.Assert(err, check.IsNil) + defer db.Close() + + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") + c.Assert(err, check.IsNil) + params := defaultParams.Clone() + params.timezone = `"UTC"` + dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) + c.Assert(err, check.IsNil) + c.Assert(strings.Contains(dsnStr, "time_zone=%22UTC%22"), check.IsTrue) } - c.Assert(strings.Contains(dsnStr, "time_zone"), check.IsFalse) - params.timezone = `"UTC"` - dsnStr, err = configureSinkURI(context.TODO(), dsn, params, db) - c.Assert(err, check.IsNil) - c.Assert(strings.Contains(dsnStr, "time_zone=%22UTC%22"), check.IsTrue) + testTimeoutParams := func() { + db, err := mockTestDB() + c.Assert(err, check.IsNil) + defer db.Close() + + dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") + c.Assert(err, check.IsNil) + uri, err := url.Parse("mysql://127.0.0.1:3306/?read-timeout=4m&write-timeout=5m&timeout=3m") + c.Assert(err, check.IsNil) + params, err := parseSinkURI(context.TODO(), uri, map[string]string{}) + c.Assert(err, check.IsNil) + dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) + c.Assert(err, check.IsNil) + expectedParams := []string{ + "readTimeout=4m", + "writeTimeout=5m", + "timeout=3m", + } + for _, param := range expectedParams { + c.Assert(strings.Contains(dsnStr, param), check.IsTrue) + } + } + + testDefaultParams() + testTimezoneParam() + testTimeoutParams() } func (s MySQLSinkSuite) TestParseSinkURI(c *check.C) { @@ -1042,6 +1073,72 @@ func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) { c.Assert(err, check.IsNil) } +type mockUnavailableMySQL struct { + listener net.Listener + quit chan interface{} + wg sync.WaitGroup +} + +func newMockUnavailableMySQL(addr string, c *check.C) *mockUnavailableMySQL { + s := &mockUnavailableMySQL{ + quit: make(chan interface{}), + } + l, err := net.Listen("tcp", addr) + c.Assert(err, check.IsNil) + s.listener = l + s.wg.Add(1) + go s.serve(c) + return s +} + +func (s *mockUnavailableMySQL) serve(c *check.C) { + defer s.wg.Done() + + for { + _, err := s.listener.Accept() + if err != nil { + select { + case <-s.quit: + return + default: + c.Error(err) + } + } else { + s.wg.Add(1) + go func() { + // don't read from TCP connection, to simulate database service unavailable + <-s.quit + s.wg.Done() + }() + } + } +} + +func (s *mockUnavailableMySQL) Stop() { + close(s.quit) + s.listener.Close() + s.wg.Wait() +} + +func (s MySQLSinkSuite) TestNewMySQLTimeout(c *check.C) { + defer testleak.AfterTest(c)() + + addr := "127.0.0.1:33333" + mockMySQL := newMockUnavailableMySQL(addr, c) + defer mockMySQL.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse(fmt.Sprintf("mysql://%s/?read-timeout=2s&timeout=2s", addr)) + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + _, err = newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + c.Assert(errors.Cause(err), check.Equals, driver.ErrBadConn) +} + func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { defer testleak.AfterTest(c)() @@ -1164,6 +1261,75 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { c.Assert(err, check.IsNil) } +func (s MySQLSinkSuite) TestExecDMLRollback(c *check.C) { + defer testleak.AfterTest(c)() + + rows := []*model.RowChangedEvent{ + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + }, + }, + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 2}, + }, + }, + } + + errDatabaseNotExists := &dmysql.MySQLError{ + Number: uint16(infoschema.ErrDatabaseNotExists.Code()), + } + dbIndex := 0 + mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + c.Assert(err, check.IsNil) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + c.Assert(err, check.IsNil) + for i := 0; i < defaultDMLMaxRetryTime+1; i++ { + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). + WithArgs(1, 2). + WillReturnError(errDatabaseNotExists) + mock.ExpectRollback() + } + mock.ExpectClose() + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConn + defer func() { + getDBConnImpl = backupGetDBConn + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + c.Assert(err, check.IsNil) + + err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) + c.Assert(errors.Cause(err), check.Equals, errDatabaseNotExists) + + err = sink.Close() + c.Assert(err, check.IsNil) +} + func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { defer testleak.AfterTest(c)() @@ -1247,301 +1413,3 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDDL(c *check.C) { err = sink.Close() c.Assert(err, check.IsNil) } - -/* - import ( - "context" - "sort" - "testing" - - "github.com/DATA-DOG/go-sqlmock" - dmysql "github.com/go-sql-driver/mysql" - "github.com/pingcap/check" - timodel "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/types" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/schema" - "github.com/pingcap/tidb/infoschema" - dbtypes "github.com/pingcap/tidb/types" - ) - - type EmitSuite struct{} - - func Test(t *testing.T) { check.TestingT(t) } - - var _ = check.Suite(&EmitSuite{}) - - func (s EmitSuite) TestShouldExecDDL(c *check.C) { - // Set up - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) - defer db.Close() - - sink := mysqlSink{ - db: db, - } - - t := model.SingleTableTxn{ - DDL: &model.DDL{ - Database: "test", - Table: "user", - Job: &timodel.Job{ - Query: "CREATE TABLE user (id INT PRIMARY KEY);", - }, - }, - } - - mock.ExpectBegin() - mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec(t.DDL.Job.Query).WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit() - - // Execute - err = sink.EmitDDL(context.Background(), t) - - // Validate - c.Assert(err, check.IsNil) - c.Assert(mock.ExpectationsWereMet(), check.IsNil) - } - - func (s EmitSuite) TestShouldIgnoreCertainDDLError(c *check.C) { - // Set up - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) - defer db.Close() - - sink := mysqlSink{ - db: db, - } - - t := model.SingleTableTxn{ - DDL: &model.DDL{ - Database: "test", - Table: "user", - Job: &timodel.Job{ - Query: "CREATE TABLE user (id INT PRIMARY KEY);", - }, - }, - } - - mock.ExpectBegin() - mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) - ignorable := dmysql.MySQLError{ - Number: uint16(infoschema.ErrTableExists.Code()), - } - mock.ExpectExec(t.DDL.Job.Query).WillReturnError(&ignorable) - - // Execute - err = sink.EmitDDL(context.Background(), t) - - // Validate - c.Assert(err, check.IsNil) - c.Assert(mock.ExpectationsWereMet(), check.IsNil) - } - - type tableHelper struct { - } - - func (h *tableHelper) TableByID(id int64) (info *schema.TableInfo, ok bool) { - return schema.WrapTableInfo(&timodel.TableInfo{ - Columns: []*timodel.ColumnInfo{ - { - Name: timodel.CIStr{O: "id"}, - State: timodel.StatePublic, - FieldType: types.FieldType{ - Tp: mysql.TypeLong, - Flen: types.UnspecifiedLength, - Decimal: types.UnspecifiedLength, - }, - }, - { - Name: timodel.CIStr{O: "name"}, - State: timodel.StatePublic, - FieldType: types.FieldType{ - Tp: mysql.TypeString, - Flen: types.UnspecifiedLength, - Decimal: types.UnspecifiedLength, - }, - }, - }, - }), true - } - - func (h *tableHelper) GetTableByName(schema, table string) (*schema.TableInfo, bool) { - return h.TableByID(42) - } - - func (h *tableHelper) GetTableIDByName(schema, table string) (int64, bool) { - return 42, true - } - - func (s EmitSuite) TestShouldExecReplaceInto(c *check.C) { - // Set up - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) - defer db.Close() - - helper := tableHelper{} - sink := mysqlSink{ - db: db, - infoGetter: &helper, - } - - t := model.SingleTableTxn{ - DMLs: []*model.DML{ - { - Database: "test", - Table: "user", - Tp: model.InsertDMLType, - Values: map[string]dbtypes.Datum{ - "id": dbtypes.NewDatum(42), - "name": dbtypes.NewDatum("tester1"), - }, - }, - }, - } - - mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `test`.`user`(`id`,`name`) VALUES (?,?);"). - WithArgs(42, "tester1"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit() - - // Execute - err = sink.EmitDMLs(context.Background(), t) - - // Validate - c.Assert(err, check.IsNil) - c.Assert(mock.ExpectationsWereMet(), check.IsNil) - } - - func (s EmitSuite) TestShouldExecDelete(c *check.C) { - // Set up - db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) - c.Assert(err, check.IsNil) - defer db.Close() - - helper := tableHelper{} - sink := mysqlSink{ - db: db, - infoGetter: &helper, - } - - t := model.SingleTableTxn{ - DMLs: []*model.DML{ - { - Database: "test", - Table: "user", - Tp: model.DeleteDMLType, - Values: map[string]dbtypes.Datum{ - "id": dbtypes.NewDatum(123), - "name": dbtypes.NewDatum("tester1"), - }, - }, - }, - } - - mock.ExpectBegin() - mock.ExpectExec("DELETE FROM `test`.`user` WHERE `id` = ? AND `name` = ? LIMIT 1;"). - WithArgs(123, "tester1"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectCommit() - - // Execute - err = sink.EmitDMLs(context.Background(), t) - - // Validate - c.Assert(err, check.IsNil) - c.Assert(mock.ExpectationsWereMet(), check.IsNil) - } - - type splitSuite struct{} - - var _ = check.Suite(&splitSuite{}) - - func (s *splitSuite) TestCanHandleEmptyInput(c *check.C) { - c.Assert(splitIndependentGroups(nil), check.HasLen, 0) - } - - func (s *splitSuite) TestShouldSplitByTable(c *check.C) { - var dmls []*model.DML - addDMLs := func(n int, db, tbl string) { - for i := 0; i < n; i++ { - dml := model.DML{ - Database: db, - Table: tbl, - } - dmls = append(dmls, &dml) - } - } - addDMLs(3, "db", "tbl1") - addDMLs(2, "db", "tbl2") - addDMLs(2, "db", "tbl1") - addDMLs(2, "db2", "tbl2") - - groups := splitIndependentGroups(dmls) - - assertAllAreFromTbl := func(dmls []*model.DML, db, tbl string) { - for _, dml := range dmls { - c.Assert(dml.Database, check.Equals, db) - c.Assert(dml.Table, check.Equals, tbl) - } - } - c.Assert(groups, check.HasLen, 3) - sort.Slice(groups, func(i, j int) bool { - tblI := groups[i][0] - tblJ := groups[j][0] - if tblI.Database != tblJ.Database { - return tblI.Database < tblJ.Database - } - return tblI.Table < tblJ.Table - }) - assertAllAreFromTbl(groups[0], "db", "tbl1") - assertAllAreFromTbl(groups[1], "db", "tbl2") - assertAllAreFromTbl(groups[2], "db2", "tbl2") - } - - type mysqlSinkSuite struct{} - - var _ = check.Suite(&mysqlSinkSuite{}) - - func (s *mysqlSinkSuite) TestBuildDBAndParams(c *check.C) { - tests := []struct { - sinkURI string - opts map[string]string - params params - }{ - { - sinkURI: "mysql://root:123@localhost:4000?worker-count=20", - opts: map[string]string{dryRunOpt: ""}, - params: params{ - workerCount: 20, - dryRun: true, - }, - }, - { - sinkURI: "tidb://root:123@localhost:4000?worker-count=20", - opts: map[string]string{dryRunOpt: ""}, - params: params{ - workerCount: 20, - dryRun: true, - }, - }, - { - sinkURI: "root@tcp(127.0.0.1:3306)/", // dsn not uri - opts: nil, - params: defaultParams, - }, - } - - for _, t := range tests { - c.Log("case sink: ", t.sinkURI) - db, params, err := buildDBAndParams(t.sinkURI, t.opts) - c.Assert(err, check.IsNil) - c.Assert(params, check.Equals, t.params) - c.Assert(db, check.NotNil) - } - } - -*/