From fe21f01ef612b308853a423ccb29c217cb6d6c4f Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 Nov 2020 20:30:55 -0600 Subject: [PATCH] rebuild mysql conn when retry failed chunks and support `transactional-consistency` parameter (#199) * support rebuild mysql connection to retry failed chunks * refine consistency variables * add --transactional-consistency and support rebuilding mysql conn to retry --- v4/export/config.go | 109 ++++++++++++++++++---------------- v4/export/consistency.go | 43 +++++++++++--- v4/export/consistency_test.go | 24 ++++---- v4/export/dump.go | 91 +++++++++++++++++++++------- v4/export/dump_test.go | 9 ++- v4/export/metadata.go | 40 ++++++++----- v4/export/metadata_test.go | 39 ++++++------ v4/export/retry.go | 7 ++- v4/export/writer_util.go | 1 + 9 files changed, 235 insertions(+), 128 deletions(-) diff --git a/v4/export/config.go b/v4/export/config.go index dc1868375bd84..3e845a4425741 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -23,46 +23,47 @@ import ( ) const ( - flagDatabase = "database" - flagTablesList = "tables-list" - flagHost = "host" - flagUser = "user" - flagPort = "port" - flagPassword = "password" - flagAllowCleartextPasswords = "allow-cleartext-passwords" - flagThreads = "threads" - flagFilesize = "filesize" - flagStatementSize = "statement-size" - flagOutput = "output" - flagLoglevel = "loglevel" - flagLogfile = "logfile" - flagLogfmt = "logfmt" - flagConsistency = "consistency" - flagSnapshot = "snapshot" - flagNoViews = "no-views" - flagStatusAddr = "status-addr" - flagRows = "rows" - flagWhere = "where" - flagEscapeBackslash = "escape-backslash" - flagFiletype = "filetype" - flagNoHeader = "no-header" - flagNoSchemas = "no-schemas" - flagNoData = "no-data" - flagCsvNullValue = "csv-null-value" - flagSql = "sql" - flagFilter = "filter" - flagCaseSensitive = "case-sensitive" - flagDumpEmptyDatabase = "dump-empty-database" - flagTidbMemQuotaQuery = "tidb-mem-quota-query" - flagCA = "ca" - flagCert = "cert" - flagKey = "key" - flagCsvSeparator = "csv-separator" - flagCsvDelimiter = "csv-delimiter" - flagOutputFilenameTemplate = "output-filename-template" - flagCompleteInsert = "complete-insert" - flagParams = "params" - flagReadTimeout = "read-timeout" + flagDatabase = "database" + flagTablesList = "tables-list" + flagHost = "host" + flagUser = "user" + flagPort = "port" + flagPassword = "password" + flagAllowCleartextPasswords = "allow-cleartext-passwords" + flagThreads = "threads" + flagFilesize = "filesize" + flagStatementSize = "statement-size" + flagOutput = "output" + flagLoglevel = "loglevel" + flagLogfile = "logfile" + flagLogfmt = "logfmt" + flagConsistency = "consistency" + flagSnapshot = "snapshot" + flagNoViews = "no-views" + flagStatusAddr = "status-addr" + flagRows = "rows" + flagWhere = "where" + flagEscapeBackslash = "escape-backslash" + flagFiletype = "filetype" + flagNoHeader = "no-header" + flagNoSchemas = "no-schemas" + flagNoData = "no-data" + flagCsvNullValue = "csv-null-value" + flagSql = "sql" + flagFilter = "filter" + flagCaseSensitive = "case-sensitive" + flagDumpEmptyDatabase = "dump-empty-database" + flagTidbMemQuotaQuery = "tidb-mem-quota-query" + flagCA = "ca" + flagCert = "cert" + flagKey = "key" + flagCsvSeparator = "csv-separator" + flagCsvDelimiter = "csv-delimiter" + flagOutputFilenameTemplate = "output-filename-template" + flagCompleteInsert = "complete-insert" + flagParams = "params" + flagReadTimeout = "read-timeout" + flagTransactionalConsistency = "transactional-consistency" FlagHelp = "help" ) @@ -108,15 +109,16 @@ type Config struct { CsvDelimiter string ReadTimeout time.Duration - TableFilter filter.Filter `json:"-"` - Rows uint64 - Where string - FileType string - CompleteInsert bool - EscapeBackslash bool - DumpEmptyDatabase bool - OutputFileTemplate *template.Template `json:"-"` - SessionParams map[string]interface{} + TableFilter filter.Filter `json:"-"` + Rows uint64 + Where string + FileType string + CompleteInsert bool + TransactionalConsistency bool + EscapeBackslash bool + DumpEmptyDatabase bool + OutputFileTemplate *template.Template `json:"-"` + SessionParams map[string]interface{} PosAfterConnect bool @@ -141,7 +143,7 @@ func DefaultConfig() *Config { SortByPk: true, Tables: nil, Snapshot: "", - Consistency: "auto", + Consistency: consistencyTypeAuto, NoViews: true, Rows: UnspecifiedSize, Where: "", @@ -202,7 +204,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.String(flagLoglevel, "info", "Log level: {debug|info|warn|error|dpanic|panic|fatal}") flags.StringP(flagLogfile, "L", "", "Log file `path`, leave empty to write to console") flags.String(flagLogfmt, "text", "Log `format`: {text|json}") - flags.String(flagConsistency, "auto", "Consistency level during dumping: {auto|none|flush|lock|snapshot}") + flags.String(flagConsistency, consistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}") flags.String(flagSnapshot, "", "Snapshot position (uint64 from pd timestamp for TiDB). Valid only when consistency=snapshot") flags.BoolP(flagNoViews, "W", true, "Do not dump views") flags.String(flagStatusAddr, ":8281", "dumpling API server and pprof addr") @@ -230,6 +232,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.Bool(FlagHelp, false, "Print help message and quit") flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.") flags.MarkHidden(flagReadTimeout) + flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency") } // GetDSN generates DSN from Config @@ -367,6 +370,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + conf.TransactionalConsistency, err = flags.GetBool(flagTransactionalConsistency) + if err != nil { + return errors.Trace(err) + } if conf.Threads <= 0 { return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads) diff --git a/v4/export/consistency.go b/v4/export/consistency.go index 9f948c28c1d21..e9357b3ac809a 100644 --- a/v4/export/consistency.go +++ b/v4/export/consistency.go @@ -7,6 +7,14 @@ import ( "github.com/pingcap/errors" ) +const ( + consistencyTypeAuto = "auto" + consistencyTypeFlush = "flush" + consistencyTypeLock = "lock" + consistencyTypeSnapshot = "snapshot" + consistencyTypeNone = "none" +) + func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) { resolveAutoConsistency(conf) conn, err := session.Conn(ctx) @@ -14,22 +22,22 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB return nil, err } switch conf.Consistency { - case "flush": + case consistencyTypeFlush: return &ConsistencyFlushTableWithReadLock{ serverType: conf.ServerInfo.ServerType, conn: conn, }, nil - case "lock": + case consistencyTypeLock: return &ConsistencyLockDumpingTables{ conn: conn, allTables: conf.Tables, }, nil - case "snapshot": + case consistencyTypeSnapshot: if conf.ServerInfo.ServerType != ServerTypeTiDB { return nil, errors.New("snapshot consistency is not supported for this server") } return &ConsistencyNone{}, nil - case "none": + case consistencyTypeNone: return &ConsistencyNone{}, nil default: return nil, errors.Errorf("invalid consistency option %s", conf.Consistency) @@ -39,6 +47,7 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB type ConsistencyController interface { Setup(context.Context) error TearDown(context.Context) error + PingContext(context.Context) error } type ConsistencyNone struct{} @@ -51,6 +60,10 @@ func (c *ConsistencyNone) TearDown(_ context.Context) error { return nil } +func (c *ConsistencyNone) PingContext(_ context.Context) error { + return nil +} + type ConsistencyFlushTableWithReadLock struct { serverType ServerType conn *sql.Conn @@ -74,6 +87,13 @@ func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error return UnlockTables(ctx, c.conn) } +func (c *ConsistencyFlushTableWithReadLock) PingContext(ctx context.Context) error { + if c.conn == nil { + return errors.New("consistency connection has already been closed!") + } + return c.conn.PingContext(ctx) +} + type ConsistencyLockDumpingTables struct { conn *sql.Conn allTables DatabaseTables @@ -102,18 +122,25 @@ func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error { return UnlockTables(ctx, c.conn) } +func (c *ConsistencyLockDumpingTables) PingContext(ctx context.Context) error { + if c.conn == nil { + return errors.New("consistency connection has already been closed!") + } + return c.conn.PingContext(ctx) +} + const snapshotFieldIndex = 1 func resolveAutoConsistency(conf *Config) { - if conf.Consistency != "auto" { + if conf.Consistency != consistencyTypeAuto { return } switch conf.ServerInfo.ServerType { case ServerTypeTiDB: - conf.Consistency = "snapshot" + conf.Consistency = consistencyTypeSnapshot case ServerTypeMySQL, ServerTypeMariaDB: - conf.Consistency = "flush" + conf.Consistency = consistencyTypeFlush default: - conf.Consistency = "none" + conf.Consistency = consistencyTypeNone } } diff --git a/v4/export/consistency_test.go b/v4/export/consistency_test.go index 71999e67ba838..63eff2e18d466 100644 --- a/v4/export/consistency_test.go +++ b/v4/export/consistency_test.go @@ -33,13 +33,13 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { conf := DefaultConfig() resultOk := sqlmock.NewResult(0, 1) - conf.Consistency = "none" + conf.Consistency = consistencyTypeNone ctrl, _ := NewConsistencyController(ctx, conf, db) _, ok := ctrl.(*ConsistencyNone) c.Assert(ok, IsTrue) s.assertLifetimeErrNil(ctx, ctrl, c) - conf.Consistency = "flush" + conf.Consistency = consistencyTypeFlush mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk) mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) ctrl, _ = NewConsistencyController(ctx, conf, db) @@ -50,14 +50,14 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { c.Fatalf(err.Error()) } - conf.Consistency = "snapshot" + conf.Consistency = consistencyTypeSnapshot conf.ServerInfo.ServerType = ServerTypeTiDB ctrl, _ = NewConsistencyController(ctx, conf, db) _, ok = ctrl.(*ConsistencyNone) c.Assert(ok, IsTrue) s.assertLifetimeErrNil(ctx, ctrl, c) - conf.Consistency = "lock" + conf.Consistency = consistencyTypeLock conf.Tables = NewDatabaseTables(). AppendTables("db1", "t1", "t2", "t3"). AppendViews("db2", "t4") @@ -80,14 +80,14 @@ func (s *testConsistencySuite) TestResolveAutoConsistency(c *C) { serverTp ServerType resolvedConsistency string }{ - {ServerTypeTiDB, "snapshot"}, - {ServerTypeMySQL, "flush"}, - {ServerTypeMariaDB, "flush"}, - {ServerTypeUnknown, "none"}, + {ServerTypeTiDB, consistencyTypeSnapshot}, + {ServerTypeMySQL, consistencyTypeFlush}, + {ServerTypeMariaDB, consistencyTypeFlush}, + {ServerTypeUnknown, consistencyTypeNone}, } for _, x := range cases { - conf.Consistency = "auto" + conf.Consistency = consistencyTypeAuto conf.ServerInfo.ServerType = x.serverTp resolveAutoConsistency(conf) cmt := Commentf("server type %s", x.serverTp.String()) @@ -109,20 +109,20 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) { c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue) // snapshot consistency is only available in TiDB - conf.Consistency = "snapshot" + conf.Consistency = consistencyTypeSnapshot conf.ServerInfo.ServerType = ServerTypeUnknown _, err = NewConsistencyController(ctx, conf, db) c.Assert(err, NotNil) // flush consistency is unavailable in TiDB - conf.Consistency = "flush" + conf.Consistency = consistencyTypeFlush conf.ServerInfo.ServerType = ServerTypeTiDB ctrl, _ := NewConsistencyController(ctx, conf, db) err = ctrl.Setup(ctx) c.Assert(err, NotNil) // lock table fail - conf.Consistency = "lock" + conf.Consistency = consistencyTypeLock conf.Tables = NewDatabaseTables().AppendTables("db", "t") mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New("")) ctrl, _ = NewConsistencyController(ctx, conf, db) diff --git a/v4/export/dump.go b/v4/export/dump.go index a5d18042ef410..3bd6f50c0ac41 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -76,7 +76,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { } snapshot := conf.Snapshot - if snapshot == "" && (doPdGC || conf.Consistency == "snapshot") { + if snapshot == "" && (doPdGC || conf.Consistency == consistencyTypeSnapshot) { conn, err := pool.Conn(ctx) if err != nil { conn.Close() @@ -97,7 +97,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { if err != nil { return err } - if conf.Consistency == "snapshot" { + if conf.Consistency == consistencyTypeSnapshot { hasTiKV, err := CheckTiDBWithTiKV(pool) if err != nil { return err @@ -129,7 +129,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { defer newPool.Close() } - m := newGlobalMetadata(conf.ExternalStorage) + m := newGlobalMetadata(conf.ExternalStorage, snapshot) // write metadata even if dump failed defer func() { if err == nil { @@ -139,13 +139,13 @@ func Dump(pCtx context.Context, conf *Config) (err error) { // for consistency lock, we should lock tables at first to get the tables we want to lock & dump // for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables - if conf.Consistency == "lock" { + if conf.Consistency == consistencyTypeLock { conn, err := createConnWithConsistency(ctx, pool) if err != nil { return errors.Trace(err) } m.recordStartTime(time.Now()) - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false, snapshot) + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false) if err != nil { log.Info("get global metadata failed", zap.Error(err)) } @@ -168,13 +168,13 @@ func Dump(pCtx context.Context, conf *Config) (err error) { // for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached // for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot. - if conf.Consistency != "lock" { + if conf.Consistency != consistencyTypeLock { conn, err := pool.Conn(ctx) if err != nil { return errors.Trace(err) } m.recordStartTime(time.Now()) - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false, snapshot) + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false) if err != nil { log.Info("get global metadata failed", zap.Error(err)) } @@ -189,20 +189,25 @@ func Dump(pCtx context.Context, conf *Config) (err error) { if conf.PosAfterConnect { // record again, to provide a location to exit safe mode for DM - err = m.recordGlobalMetaData(connectPool.extraConn(), conf.ServerInfo.ServerType, true, snapshot) + err = m.recordGlobalMetaData(connectPool.extraConn(), conf.ServerInfo.ServerType, true) if err != nil { log.Info("get global metadata (after connection pool established) failed", zap.Error(err)) } } - if conf.Consistency != "lock" { + if conf.Consistency != consistencyTypeLock { if err = prepareTableListToDump(conf, connectPool.extraConn()); err != nil { return err } } - if err = conCtrl.TearDown(ctx); err != nil { - return err + if conf.TransactionalConsistency { + if conf.Consistency == consistencyTypeFlush || conf.Consistency == consistencyTypeLock { + log.Info("All the dumping transactions have started. Start to unlock tables") + } + if err = conCtrl.TearDown(ctx); err != nil { + return err + } } failpoint.Inject("ConsistencyCheck", nil) @@ -222,7 +227,28 @@ func Dump(pCtx context.Context, conf *Config) (err error) { } if conf.Sql == "" { - if err = dumpDatabases(ctx, conf, connectPool, writer); err != nil { + if err = dumpDatabases(ctx, conf, connectPool, writer, func(conn *sql.Conn) (*sql.Conn, error) { + // make sure that the lock connection is still alive + err := conCtrl.PingContext(ctx) + if err != nil { + return conn, err + } + // give up the last broken connection + conn.Close() + newConn, err := createConnWithConsistency(ctx, pool) + if err != nil { + return conn, err + } + conn = newConn + // renew the master status after connection. dm can't close safe-mode until dm reaches current pos + if conf.PosAfterConnect { + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true) + if err != nil { + return conn, err + } + } + return conn, nil + }); err != nil { return err } } else { @@ -235,7 +261,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { return nil } -func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error { +func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer, rebuildConnFunc func(*sql.Conn) (*sql.Conn, error)) error { allTables := conf.Tables g, ctx := errgroup.WithContext(pCtx) for dbName, tables := range allTables { @@ -260,18 +286,30 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP tableIR := tableIR g.Go(func() error { conn := connectPool.getConn() - defer connectPool.releaseConn(conn) - retryTime := 1 - return utils.WithRetry(ctx, func() error { - log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()), - zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex())) + defer func() { + connectPool.releaseConn(conn) + }() + retryTime := 0 + var lastErr error + return utils.WithRetry(ctx, func() (err error) { + defer func() { + lastErr = err + }() retryTime += 1 - err := tableIR.Start(ctx, conn) + log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()), + zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr)) + if retryTime > 1 { + conn, err = rebuildConnFunc(conn) + if err != nil { + return + } + } + err = tableIR.Start(ctx, conn) if err != nil { - return err + return } return writer.WriteTableData(ctx, tableIR) - }, newDumpChunkBackoffer()) + }, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency))) }) } } @@ -413,3 +451,14 @@ func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, } } } + +func canRebuildConn(consistency string, trxConsistencyOnly bool) bool { + switch consistency { + case consistencyTypeLock, consistencyTypeFlush: + return !trxConsistencyOnly + case consistencyTypeSnapshot, consistencyTypeNone: + return true + default: + return false + } +} diff --git a/v4/export/dump_test.go b/v4/export/dump_test.go index f80bae8b90f6e..71e27c64606da 100644 --- a/v4/export/dump_test.go +++ b/v4/export/dump_test.go @@ -84,7 +84,9 @@ func (s *testDumpSuite) TestDumpDatabase(c *C) { mockWriter := newMockWriter() connectPool := newMockConnectPool(c, db) - err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter) + err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter, func(conn *sql.Conn) (*sql.Conn, error) { + return conn, nil + }) c.Assert(err, IsNil) c.Assert(len(mockWriter.databaseMeta), Equals, 1) @@ -203,6 +205,7 @@ func (s *testDumpSuite) TestDumpDatabaseWithRetry(c *C) { mockConfig.SortByPk = false mockConfig.Databases = []string{"test"} mockConfig.Tables = NewDatabaseTables().AppendTables("test", "t") + mockConfig.Consistency = consistencyTypeNone db, mock, err := sqlmock.New() c.Assert(err, IsNil) @@ -222,7 +225,9 @@ func (s *testDumpSuite) TestDumpDatabaseWithRetry(c *C) { mockWriter := newMockWriter() connectPool := newMockConnectPool(c, db) - err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter) + err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter, func(conn *sql.Conn) (*sql.Conn, error) { + return conn, nil + }) c.Assert(err, IsNil) c.Assert(len(mockWriter.databaseMeta), Equals, 1) diff --git a/v4/export/metadata.go b/v4/export/metadata.go index d21fdb8bd0930..d8ce9769f508d 100644 --- a/v4/export/metadata.go +++ b/v4/export/metadata.go @@ -15,7 +15,9 @@ import ( ) type globalMetadata struct { - buffer bytes.Buffer + buffer bytes.Buffer + afterConnBuffer bytes.Buffer + snapshot string storage storage.ExternalStorage } @@ -31,10 +33,11 @@ const ( mariadbShowMasterStatusFieldNum = 4 ) -func newGlobalMetadata(s storage.ExternalStorage) *globalMetadata { +func newGlobalMetadata(s storage.ExternalStorage, snapshot string) *globalMetadata { return &globalMetadata{ - storage: s, - buffer: bytes.Buffer{}, + storage: s, + buffer: bytes.Buffer{}, + snapshot: snapshot, } } @@ -47,16 +50,25 @@ func (m *globalMetadata) recordStartTime(t time.Time) { } func (m *globalMetadata) recordFinishTime(t time.Time) { + m.buffer.Write(m.afterConnBuffer.Bytes()) m.buffer.WriteString("Finished dump at: " + t.Format(metadataTimeLayout) + "\n") } -func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerType, afterConn bool, snapshot string) error { +func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerType, afterConn bool) error { + if afterConn { + m.afterConnBuffer.Reset() + return recordGlobalMetaData(db, &m.afterConnBuffer, serverType, afterConn, m.snapshot) + } + return recordGlobalMetaData(db, &m.buffer, serverType, afterConn, m.snapshot) +} + +func recordGlobalMetaData(db *sql.Conn, buffer *bytes.Buffer, serverType ServerType, afterConn bool, snapshot string) error { // get master status info - m.buffer.WriteString("SHOW MASTER STATUS:") + buffer.WriteString("SHOW MASTER STATUS:") if afterConn { - m.buffer.WriteString(" /* AFTER CONNECTION POOL ESTABLISHED */") + buffer.WriteString(" /* AFTER CONNECTION POOL ESTABLISHED */") } - m.buffer.WriteString("\n") + buffer.WriteString("\n") switch serverType { // For MySQL: // mysql 5.6+ @@ -92,7 +104,7 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp gtidSet := getValidStr(str, gtidSetFieldIndex) if logFile != "" { - fmt.Fprintf(&m.buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) + fmt.Fprintf(buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) } // For MariaDB: // SHOW MASTER STATUS; @@ -122,12 +134,12 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp } if logFile != "" { - fmt.Fprintf(&m.buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) + fmt.Fprintf(buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) } default: return errors.New("unsupported serverType" + serverType.String() + "for recordGlobalMetaData") } - m.buffer.WriteString("\n") + buffer.WriteString("\n") if serverType == ServerTypeTiDB { return nil } @@ -184,11 +196,11 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp } } if len(host) > 0 { - m.buffer.WriteString("SHOW SLAVE STATUS:\n") + buffer.WriteString("SHOW SLAVE STATUS:\n") if isms { - m.buffer.WriteString("\tConnection name: " + connName + "\n") + buffer.WriteString("\tConnection name: " + connName + "\n") } - fmt.Fprintf(&m.buffer, "\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n", host, logFile, pos, gtidSet) + fmt.Fprintf(buffer, "\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n", host, logFile, pos, gtidSet) } return nil }) diff --git a/v4/export/metadata_test.go b/v4/export/metadata_test.go index 564a3448e0303..0345f9f38d3bd 100644 --- a/v4/export/metadata_test.go +++ b/v4/export/metadata_test.go @@ -30,8 +30,8 @@ func (s *testMetaDataSuite) TestMysqlMetaData(c *C) { mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows( sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"})) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -68,9 +68,10 @@ func (s *testMetaDataSuite) TestMetaDataAfterConn(c *C) { sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"})) mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows2) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, true, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, true), IsNil) + m.buffer.Write(m.afterConnBuffer.Bytes()) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -101,8 +102,8 @@ func (s *testMetaDataSuite) TestMysqlWithFollowersMetaData(c *C) { mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error")) mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(followerRows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -132,8 +133,8 @@ func (s *testMetaDataSuite) TestMysqlWithNullFollowersMetaData(c *C) { mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error")) mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(sqlmock.NewRows([]string{"SQL_Remaining_Delay"}).AddRow(nil)) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -159,8 +160,8 @@ func (s *testMetaDataSuite) TestMariaDBMetaData(c *C) { AddRow(gtidSet) mock.ExpectQuery("SELECT @@global.gtid_binlog_pos").WillReturnRows(rows) mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(rows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMariaDB, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMariaDB, false), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) } @@ -186,8 +187,8 @@ func (s *testMetaDataSuite) TestMariaDBWithFollowersMetaData(c *C) { AddRow("connection_1")) mock.ExpectQuery("SHOW ALL SLAVES STATUS").WillReturnRows(followerRows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -224,8 +225,8 @@ func (s *testMetaDataSuite) TestEarlierMysqlMetaData(c *C) { mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows( sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"})) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: mysql-bin.000001\n"+ @@ -247,8 +248,8 @@ func (s *testMetaDataSuite) TestTiDBSnapshotMetaData(c *C) { AddRow(logFile, pos, "", "") mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: tidb-binlog\n"+ "\tPos: 420633329401856001\n"+ @@ -258,8 +259,8 @@ func (s *testMetaDataSuite) TestTiDBSnapshotMetaData(c *C) { rows = sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB"}). AddRow(logFile, pos, "", "") mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows) - m = newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false, snapshot), IsNil) + m = newGlobalMetadata(s.createStorage(c), snapshot) + c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: tidb-binlog\n"+ "\tPos: 420633273211289601\n"+ diff --git a/v4/export/retry.go b/v4/export/retry.go index e5b7a877f9cd9..1ea990fb57cf7 100644 --- a/v4/export/retry.go +++ b/v4/export/retry.go @@ -14,7 +14,12 @@ const ( dumpChunkMaxWaitInterval = 200 * time.Millisecond ) -func newDumpChunkBackoffer() *dumpChunkBackoffer { +func newDumpChunkBackoffer(canRetry bool) *dumpChunkBackoffer { + if !canRetry { + return &dumpChunkBackoffer{ + attempt: 1, + } + } return &dumpChunkBackoffer{ attempt: dumpChunkRetryTime, delayTime: dumpChunkWaitInterval, diff --git a/v4/export/writer_util.go b/v4/export/writer_util.go index 4a24c6e15b100..ecf3db64210e3 100644 --- a/v4/export/writer_util.go +++ b/v4/export/writer_util.go @@ -313,6 +313,7 @@ func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w storage.Writer, log.Debug("dumping table", zap.String("table", tblIR.TableName()), + zap.Int("chunkIndex", tblIR.ChunkIndex()), zap.Int("record counts", counter)) if bf.Len() > 0 { wp.input <- bf