diff --git a/v4/export/config.go b/v4/export/config.go index bfa9bdf56b893..dc1868375bd84 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -62,7 +62,9 @@ const ( flagOutputFilenameTemplate = "output-filename-template" flagCompleteInsert = "complete-insert" flagParams = "params" - FlagHelp = "help" + flagReadTimeout = "read-timeout" + + FlagHelp = "help" ) type Config struct { @@ -104,6 +106,7 @@ type Config struct { Sql string CsvSeparator string CsvDelimiter string + ReadTimeout time.Duration TableFilter filter.Filter `json:"-"` Rows uint64 @@ -166,7 +169,10 @@ func (config *Config) String() string { // GetDSN generates DSN from Config func (conf *Config) GetDSN(db string) string { - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", conf.User, conf.Password, conf.Host, conf.Port, db) + // maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection. + // https://github.com/go-sql-driver/mysql#maxallowedpacket + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&readTimeout=%s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0", + conf.User, conf.Password, conf.Host, conf.Port, db, conf.ReadTimeout) if len(conf.Security.CAPath) > 0 { dsn += "&tls=dumpling-tls-target" } @@ -222,6 +228,8 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.Bool(flagCompleteInsert, false, "Use complete INSERT statements that include column names") flags.StringToString(flagParams, nil, `Extra session variables used while dumping, accepted format: --params "character_set_client=latin1,character_set_connection=latin1"`) flags.Bool(FlagHelp, false, "Print help message and quit") + flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.") + flags.MarkHidden(flagReadTimeout) } // GetDSN generates DSN from Config @@ -355,6 +363,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + conf.ReadTimeout, err = flags.GetDuration(flagReadTimeout) + if err != nil { + return errors.Trace(err) + } if conf.Threads <= 0 { return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads) diff --git a/v4/export/connectionsPool.go b/v4/export/connectionsPool.go index 764d9fe702a97..c803533f245df 100644 --- a/v4/export/connectionsPool.go +++ b/v4/export/connectionsPool.go @@ -13,15 +13,17 @@ type connectionsPool struct { func newConnectionsPool(ctx context.Context, n int, pool *sql.DB) (*connectionsPool, error) { connectPool := &connectionsPool{ conns: make(chan *sql.Conn, n), - createdConns: make([]*sql.Conn, 0, n), + createdConns: make([]*sql.Conn, 0, n+1), } - for i := 0; i < n; i++ { + for i := 0; i < n+1; i++ { conn, err := createConnWithConsistency(ctx, pool) if err != nil { connectPool.Close() return connectPool, err } - connectPool.releaseConn(conn) + if i != n { + connectPool.releaseConn(conn) + } connectPool.createdConns = append(connectPool.createdConns, conn) } return connectPool, nil @@ -31,6 +33,10 @@ func (r *connectionsPool) getConn() *sql.Conn { return <-r.conns } +func (r *connectionsPool) extraConn() *sql.Conn { + return r.createdConns[len(r.createdConns)-1] +} + func (r *connectionsPool) Close() error { var err error for _, conn := range r.createdConns { diff --git a/v4/export/dump.go b/v4/export/dump.go index 1d9eb73ceff06..a5d18042ef410 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -131,7 +131,11 @@ func Dump(pCtx context.Context, conf *Config) (err error) { m := newGlobalMetadata(conf.ExternalStorage) // write metadata even if dump failed - defer m.writeGlobalMetaData(ctx) + defer func() { + if err == nil { + m.writeGlobalMetaData(ctx) + } + }() // for consistency lock, we should lock tables at first to get the tables we want to lock & dump // for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables @@ -184,22 +188,17 @@ func Dump(pCtx context.Context, conf *Config) (err error) { defer connectPool.Close() if conf.PosAfterConnect { - conn := connectPool.getConn() // record again, to provide a location to exit safe mode for DM - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true, snapshot) + err = m.recordGlobalMetaData(connectPool.extraConn(), conf.ServerInfo.ServerType, true, snapshot) if err != nil { log.Info("get global metadata (after connection pool established) failed", zap.Error(err)) } - connectPool.releaseConn(conn) } if conf.Consistency != "lock" { - conn := connectPool.getConn() - if err = prepareTableListToDump(conf, conn); err != nil { - connectPool.releaseConn(conn) + if err = prepareTableListToDump(conf, connectPool.extraConn()); err != nil { return err } - connectPool.releaseConn(conn) } if err = conCtrl.TearDown(ctx); err != nil { @@ -240,9 +239,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP allTables := conf.Tables g, ctx := errgroup.WithContext(pCtx) for dbName, tables := range allTables { - conn := connectPool.getConn() - createDatabaseSQL, err := ShowCreateDatabase(conn, dbName) - connectPool.releaseConn(conn) + createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName) if err != nil { return err } @@ -255,21 +252,19 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP } for _, table := range tables { table := table - conn := connectPool.getConn() - tableDataIRArray, err := dumpTable(ctx, conf, conn, dbName, table, writer) - connectPool.releaseConn(conn) + tableDataIRArray, err := dumpTable(ctx, conf, connectPool.extraConn(), dbName, table, writer) if err != nil { return err } for _, tableIR := range tableDataIRArray { tableIR := tableIR g.Go(func() error { + conn := connectPool.getConn() + defer connectPool.releaseConn(conn) retryTime := 1 return utils.WithRetry(ctx, func() error { log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()), zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex())) - conn := connectPool.getConn() - defer connectPool.releaseConn(conn) retryTime += 1 err := tableIR.Start(ctx, conn) if err != nil { @@ -308,9 +303,7 @@ func prepareTableListToDump(conf *Config, pool *sql.Conn) error { } func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error { - conn := connectPool.getConn() - tableIR, err := SelectFromSql(conf, conn) - connectPool.releaseConn(conn) + tableIR, err := SelectFromSql(conf, connectPool.extraConn()) if err != nil { return err } diff --git a/v4/export/dump_test.go b/v4/export/dump_test.go index 8f9e542e8f647..f80bae8b90f6e 100644 --- a/v4/export/dump_test.go +++ b/v4/export/dump_test.go @@ -36,6 +36,7 @@ func newMockConnectPool(c *C, db *sql.DB) *connectionsPool { c.Assert(err, IsNil) connectPool := &connectionsPool{conns: make(chan *sql.Conn, 1)} connectPool.releaseConn(conn) + connectPool.createdConns = []*sql.Conn{conn} return connectPool } diff --git a/v4/export/ir.go b/v4/export/ir.go index b2d44d959858b..a740a2b8a7c80 100644 --- a/v4/export/ir.go +++ b/v4/export/ir.go @@ -22,6 +22,7 @@ type TableDataIR interface { SpecialComments() StringIter Rows() SQLRowIter + Close() error } // SQLRowIter is the iterator on a collection of sql.Row. diff --git a/v4/export/ir_impl.go b/v4/export/ir_impl.go index e9937875f4765..1eec628315708 100644 --- a/v4/export/ir_impl.go +++ b/v4/export/ir_impl.go @@ -85,10 +85,13 @@ type tableData struct { selectedField string specCmts []string escapeBackslash bool + cancel context.CancelFunc SQLRowIter } -func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error { +func (td *tableData) Start(pCtx context.Context, conn *sql.Conn) error { + var ctx context.Context + ctx, td.cancel = context.WithCancel(pCtx) rows, err := conn.QueryContext(ctx, td.query) if err != nil { return err @@ -137,6 +140,13 @@ func (td *tableData) Rows() SQLRowIter { return td.SQLRowIter } +func (td *tableData) Close() error { + if td.cancel != nil { + td.cancel() + } + return td.Rows().Close() +} + func (td *tableData) SelectedField() string { if td.selectedField == "*" || td.selectedField == "" { return td.selectedField @@ -187,7 +197,7 @@ func splitTableDataIntoChunks( return } if !smax.Valid || !smin.Valid { - // smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic. + // smax and smin are not valid, but there can also be data to dump, so just skip split chunk logic. log.Debug("skip concurrent dump due to no valid smax or smin", zap.String("schema", dbName), zap.String("table", tableName)) linear <- struct{}{} return diff --git a/v4/export/retry.go b/v4/export/retry.go index 894d70428f238..e5b7a877f9cd9 100644 --- a/v4/export/retry.go +++ b/v4/export/retry.go @@ -33,6 +33,10 @@ func (b *dumpChunkBackoffer) NextBackoff(err error) time.Duration { if _, ok := err.(*mysql.MySQLError); ok && !dbutil.IsRetryableError(err) { b.attempt = 0 return 0 + } else if _, ok := err.(*writerError); ok { + // the uploader writer's retry logic is already done in aws client. needn't retry here + b.attempt = 0 + return 0 } b.delayTime = 2 * b.delayTime b.attempt-- diff --git a/v4/export/test_util.go b/v4/export/test_util.go index d7300aa4d1d3f..cc997283e7023 100644 --- a/v4/export/test_util.go +++ b/v4/export/test_util.go @@ -146,6 +146,10 @@ func (m *mockTableIR) Rows() SQLRowIter { return m.SQLRowIter } +func (m *mockTableIR) Close() error { + return nil +} + func (m *mockTableIR) EscapeBackSlash() bool { return m.escapeBackSlash } diff --git a/v4/export/writer.go b/v4/export/writer.go index 4977cde2bfc33..267c0d1772885 100644 --- a/v4/export/writer.go +++ b/v4/export/writer.go @@ -63,29 +63,20 @@ func (f SimpleWriter) WriteViewMeta(ctx context.Context, db, view, createTableSQ type SQLWriter struct{ SimpleWriter } -func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { +func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) { log.Debug("start dumping table...", zap.String("table", ir.TableName())) - // just let `database.table.sql` be `database.table.0.sql` - /*if fileName == "" { - // set initial file name - fileName = fmt.Sprintf("%s.%s.sql", ir.DatabaseName(), ir.TableName()) - if f.cfg.FileSize != UnspecifiedSize { - fileName = fmt.Sprintf("%s.%s.%d.sql", ir.DatabaseName(), ir.TableName(), 0) - } - }*/ + defer ir.Close() namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize) fileType := strings.ToLower(f.cfg.FileType) fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType) if err != nil { return err } - chunksIter := ir - defer chunksIter.Rows().Close() for { fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName) - err = WriteInsert(ctx, chunksIter, fileWriter, f.cfg.FileSize, f.cfg.StatementSize) + err = WriteInsert(ctx, ir, fileWriter, f.cfg.FileSize, f.cfg.StatementSize) tearDown(ctx) if err != nil { return err @@ -175,17 +166,16 @@ func (namer *outputFileNamer) NextName(tmpl *template.Template, fileType string) return res + "." + fileType, err } -func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { +func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) (err error) { log.Debug("start dumping table in csv format...", zap.String("table", ir.TableName())) + defer ir.Close() namer := newOutputFileNamer(ir, f.cfg.Rows != UnspecifiedSize, f.cfg.FileSize != UnspecifiedSize) fileType := strings.ToLower(f.cfg.FileType) fileName, err := namer.NextName(f.cfg.OutputFileTemplate, fileType) if err != nil { return err } - chunksIter := ir - defer chunksIter.Rows().Close() opt := &csvOption{ nullValue: f.cfg.CsvNullValue, @@ -195,7 +185,7 @@ func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { for { fileWriter, tearDown := buildInterceptFileWriter(f.cfg.ExternalStorage, fileName) - err = WriteInsertInCsv(ctx, chunksIter, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize) + err = WriteInsertInCsv(ctx, ir, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize) tearDown(ctx) if err != nil { return err diff --git a/v4/export/writer_util.go b/v4/export/writer_util.go index 348d5023d2fc3..4a24c6e15b100 100644 --- a/v4/export/writer_util.go +++ b/v4/export/writer_util.go @@ -389,13 +389,13 @@ func buildInterceptFileWriter(s storage.ExternalStorage, path string) (storage.W log.Error("open file failed", zap.String("path", fullPath), zap.Error(err)) - return err + return newWriterError(err) } w := storage.NewUploaderWriter(uploader, hardcodedS3ChunkSize) writer = w log.Debug("opened file", zap.String("path", fullPath)) fileWriter.Writer = writer - return err + return nil } fileWriter.initRoutine = initRoutine @@ -427,6 +427,21 @@ func (l *LazyStringWriter) WriteString(str string) (int, error) { return l.StringWriter.WriteString(str) } +type writerError struct { + error +} + +func (e *writerError) Error() string { + return e.error.Error() +} + +func newWriterError(err error) error { + if err == nil { + return nil + } + return &writerError{error: err} +} + // InterceptFileWriter is an interceptor of os.File, // tracking whether a StringWriter has written something. type InterceptFileWriter struct { @@ -446,7 +461,8 @@ func (w *InterceptFileWriter) Write(ctx context.Context, p []byte) (int, error) if w.err != nil { return 0, errors.Annotate(w.err, "open file error") } - return w.Writer.Write(ctx, p) + n, err := w.Writer.Write(ctx, p) + return n, newWriterError(err) } func (w *InterceptFileWriter) Close(ctx context.Context) error {