From e9dc11be1990006c4f961b4a46a4cc112feecf01 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 8 Sep 2020 13:28:28 +0800 Subject: [PATCH] Unify `--filesize` and `--statement-size` definition with mydumper's (#142) * reduce bound address times to save more time * tmp * unify file size * fix ut * fix bug that escapeBackSlash not used for rows arguments * tmp * move filesize and statementsize argument to writepipe * fix format * address comment --- dumpling/tests/file_size/run.sh | 4 +- dumpling/tests/no_table_and_db_name/run.sh | 9 +- dumpling/v4/export/ir.go | 3 - dumpling/v4/export/ir_impl.go | 105 +------------------- dumpling/v4/export/ir_impl_test.go | 37 ++++--- dumpling/v4/export/sql_type.go | 42 ++++---- dumpling/v4/export/test_util.go | 40 ++++---- dumpling/v4/export/writer.go | 8 +- dumpling/v4/export/writer_test.go | 20 ++-- dumpling/v4/export/writer_util.go | 106 ++++++++++++++------- dumpling/v4/export/writer_util_test.go | 17 ++-- 11 files changed, 168 insertions(+), 223 deletions(-) diff --git a/dumpling/tests/file_size/run.sh b/dumpling/tests/file_size/run.sh index 0eb91f63..0506d097 100644 --- a/dumpling/tests/file_size/run.sh +++ b/dumpling/tests/file_size/run.sh @@ -12,8 +12,8 @@ chars_20="1111_0000_1111_0000_" # insert 100 records, each occupies 20 bytes run_sql "insert into t values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('$chars_20')/g");" -# dumping with file size = 200 bytes -run_dumpling -F 200B +# dumping with file size = 311 bytes, actually 10 rows +run_dumpling -F 311B # the dumping result is expected to be: # 10 files for insertion(each conatins 10 records / 200 bytes) diff --git a/dumpling/tests/no_table_and_db_name/run.sh b/dumpling/tests/no_table_and_db_name/run.sh index 0f5c4bfe..ba13d38d 100644 --- a/dumpling/tests/no_table_and_db_name/run.sh +++ b/dumpling/tests/no_table_and_db_name/run.sh @@ -20,17 +20,16 @@ chars_20="1111_0000_1111_0000_" # insert 100 records, each occupies 20 bytes run_sql "insert into t values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('$chars_20')/g");" -# dumping with file size = 200 bytes -run_dumpling -F 200B --filetype csv --sql "select * from $TEST_NAME.t" +# dumping with file size = 233 bytes, actually 10 rows +run_dumpling -F 233B --filetype csv --sql "select * from $TEST_NAME.t" assert [ $( ls -lh $DUMPLING_OUTPUT_DIR | grep -e ".csv$" | wc -l ) -eq 10 ] # 10 files with header. assert [ $( cat $DUMPLING_OUTPUT_DIR/*.csv | wc -l ) -eq $(( 100 + 10 )) ] - -# dumping with file size = 200 bytes -run_dumpling -F 200B --filetype sql --sql "select * from $TEST_NAME.t" +# dumping with file size = 311 bytes, actually 10 rows +run_dumpling -F 311B --filetype sql --sql "select * from $TEST_NAME.t" assert [ $( ls -lh $DUMPLING_OUTPUT_DIR | grep -e ".sql$" | wc -l ) -eq 10 ] diff --git a/dumpling/v4/export/ir.go b/dumpling/v4/export/ir.go index d7dc33e7..78c1810f 100644 --- a/dumpling/v4/export/ir.go +++ b/dumpling/v4/export/ir.go @@ -28,8 +28,6 @@ type SQLRowIter interface { Next() Error() error HasNext() bool - HasNextSQLRowIter() bool - NextSQLRowIter() SQLRowIter // release SQLRowIter Close() error } @@ -46,7 +44,6 @@ type Stringer interface { type RowReceiver interface { BindAddress([]interface{}) - ReportSize() uint64 } func decodeFromRows(rows *sql.Rows, args []interface{}, row RowReceiver) error { diff --git a/dumpling/v4/export/ir_impl.go b/dumpling/v4/export/ir_impl.go index f95f9a3e..377edfa7 100644 --- a/dumpling/v4/export/ir_impl.go +++ b/dumpling/v4/export/ir_impl.go @@ -50,74 +50,6 @@ func (iter *rowIter) HasNext() bool { return iter.hasNext } -func (iter *rowIter) HasNextSQLRowIter() bool { - return iter.hasNext -} - -func (iter *rowIter) NextSQLRowIter() SQLRowIter { - return iter -} - -type fileRowIter struct { - rowIter SQLRowIter - fileSizeLimit uint64 - statementSizeLimit uint64 - - currentStatementSize uint64 - currentFileSize uint64 -} - -func (c *fileRowIter) Close() error { - return c.rowIter.Close() -} - -func (c *fileRowIter) Decode(row RowReceiver) error { - err := c.rowIter.Decode(row) - if err != nil { - return err - } - size := row.ReportSize() - c.currentFileSize += size - c.currentStatementSize += size - return nil -} - -func (c *fileRowIter) Error() error { - return c.rowIter.Error() -} - -func (c *fileRowIter) Next() { - c.rowIter.Next() -} - -func (c *fileRowIter) HasNext() bool { - if c.fileSizeLimit != UnspecifiedSize && c.currentFileSize >= c.fileSizeLimit { - return false - } - - if c.statementSizeLimit != UnspecifiedSize && c.currentStatementSize >= c.statementSizeLimit { - return false - } - return c.rowIter.HasNext() -} - -func (c *fileRowIter) HasNextSQLRowIter() bool { - if c.fileSizeLimit != UnspecifiedSize && c.currentFileSize >= c.fileSizeLimit { - return false - } - return c.rowIter.HasNext() -} - -func (c *fileRowIter) NextSQLRowIter() SQLRowIter { - return &fileRowIter{ - rowIter: c.rowIter, - fileSizeLimit: c.fileSizeLimit, - statementSizeLimit: c.statementSizeLimit, - currentFileSize: c.currentFileSize, - currentStatementSize: 0, - } -} - type stringIter struct { idx int ss []string @@ -153,6 +85,7 @@ type tableData struct { selectedField string specCmts []string escapeBackslash bool + SQLRowIter } func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error { @@ -197,7 +130,10 @@ func (td *tableData) ColumnCount() uint { } func (td *tableData) Rows() SQLRowIter { - return newRowIter(td.rows, len(td.colTypes)) + if td.SQLRowIter == nil { + td.SQLRowIter = newRowIter(td.rows, len(td.colTypes)) + } + return td.SQLRowIter } func (td *tableData) SelectedField() string { @@ -215,37 +151,6 @@ func (td *tableData) EscapeBackSlash() bool { return td.escapeBackslash } -type tableDataChunks struct { - TableDataIR - rows SQLRowIter - chunkSizeLimit uint64 - statementSizeLimit uint64 -} - -func (t *tableDataChunks) Rows() SQLRowIter { - if t.rows == nil { - t.rows = t.TableDataIR.Rows() - } - - return &fileRowIter{ - rowIter: t.rows, - statementSizeLimit: t.statementSizeLimit, - fileSizeLimit: t.chunkSizeLimit, - } -} - -func (t *tableDataChunks) EscapeBackSlash() bool { - return t.TableDataIR.EscapeBackSlash() -} - -func buildChunksIter(td TableDataIR, chunkSize uint64, statementSize uint64) *tableDataChunks { - return &tableDataChunks{ - TableDataIR: td, - chunkSizeLimit: chunkSize, - statementSizeLimit: statementSize, - } -} - func splitTableDataIntoChunks( ctx context.Context, tableDataIRCh chan TableDataIR, diff --git a/dumpling/v4/export/ir_impl_test.go b/dumpling/v4/export/ir_impl_test.go index 4a981b86..4e0ad464 100644 --- a/dumpling/v4/export/ir_impl_test.go +++ b/dumpling/v4/export/ir_impl_test.go @@ -25,14 +25,6 @@ func (s *simpleRowReceiver) BindAddress(args []interface{}) { } } -func (s *simpleRowReceiver) ReportSize() uint64 { - var sum uint64 - for _, datum := range s.data { - sum += uint64(len(datum)) - } - return sum -} - func (s *testIRImplSuite) TestRowIter(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) @@ -93,30 +85,33 @@ func (s *testIRImplSuite) TestChunkRowIter(c *C) { } ) - sqlRowIter := SQLRowIter(&fileRowIter{ - rowIter: newRowIter(rows, 2), - fileSizeLimit: testFileSize, - statementSizeLimit: testStatementSize, - }) + sqlRowIter := newRowIter(rows, 2) res := newSimpleRowReceiver(2) + wp := newWriterPipe(nil, testFileSize, testStatementSize) var resSize [][]uint64 - for sqlRowIter.HasNextSQLRowIter() { - sqlRowIter = sqlRowIter.NextSQLRowIter() - fileRowIter, ok := sqlRowIter.(*fileRowIter) - c.Assert(ok, IsTrue) - + for sqlRowIter.HasNext() { + wp.currentStatementSize = 0 for sqlRowIter.HasNext() { c.Assert(sqlRowIter.Decode(res), IsNil) + sz := uint64(len(res.data[0]) + len(res.data[1])) + wp.AddFileSize(sz) sqlRowIter.Next() - resSize = append(resSize, []uint64{fileRowIter.currentFileSize, fileRowIter.currentStatementSize}) + resSize = append(resSize, []uint64{wp.currentFileSize, wp.currentStatementSize}) + if wp.ShouldSwitchStatement() { + break + } + } + if wp.ShouldSwitchFile() { + break } } c.Assert(resSize, DeepEquals, expectedSize) - c.Assert(sqlRowIter.HasNextSQLRowIter(), IsFalse) - c.Assert(sqlRowIter.HasNext(), IsFalse) + c.Assert(sqlRowIter.HasNext(), IsTrue) + c.Assert(wp.ShouldSwitchFile(), IsTrue) + c.Assert(wp.ShouldSwitchStatement(), IsTrue) rows.Close() c.Assert(sqlRowIter.Decode(res), NotNil) sqlRowIter.Next() diff --git a/dumpling/v4/export/sql_type.go b/dumpling/v4/export/sql_type.go index 7db4f787..900c5551 100644 --- a/dumpling/v4/export/sql_type.go +++ b/dumpling/v4/export/sql_type.go @@ -116,7 +116,7 @@ func SQLTypeNumberMaker() RowReceiverStringer { } func MakeRowReceiver(colTypes []string) RowReceiverStringer { - rowReceiverArr := make(RowReceiverArr, len(colTypes)) + rowReceiverArr := make([]RowReceiverStringer, len(colTypes)) for i, colTp := range colTypes { recMaker, ok := colTypeRowReceiverMap[colTp] if !ok { @@ -124,29 +124,32 @@ func MakeRowReceiver(colTypes []string) RowReceiverStringer { } rowReceiverArr[i] = recMaker() } - return rowReceiverArr + return RowReceiverArr{ + bound: false, + receivers: rowReceiverArr, + } } -type RowReceiverArr []RowReceiverStringer +type RowReceiverArr struct { + bound bool + receivers []RowReceiverStringer +} func (r RowReceiverArr) BindAddress(args []interface{}) { - for i := range args { - r[i].BindAddress(args[i : i+1]) + if r.bound { + return } -} -func (r RowReceiverArr) ReportSize() uint64 { - var sum uint64 - for _, receiver := range r { - sum += receiver.ReportSize() + r.bound = true + for i := range args { + r.receivers[i].BindAddress(args[i : i+1]) } - return sum } func (r RowReceiverArr) WriteToBuffer(bf *bytes.Buffer, escapeBackslash bool) { bf.WriteByte('(') - for i, receiver := range r { + for i, receiver := range r.receivers { receiver.WriteToBuffer(bf, escapeBackslash) - if i != len(r)-1 { + if i != len(r.receivers)-1 { bf.WriteByte(',') } } @@ -154,9 +157,9 @@ func (r RowReceiverArr) WriteToBuffer(bf *bytes.Buffer, escapeBackslash bool) { } func (r RowReceiverArr) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash bool, opt *csvOption) { - for i, receiver := range r { + for i, receiver := range r.receivers { receiver.WriteToBufferInCsv(bf, escapeBackslash, opt) - if i != len(r)-1 { + if i != len(r.receivers)-1 { bf.Write(opt.separator) } } @@ -189,12 +192,6 @@ type SQLTypeString struct { func (s *SQLTypeString) BindAddress(arg []interface{}) { arg[0] = &s.RawBytes } -func (s *SQLTypeString) ReportSize() uint64 { - if s.RawBytes != nil { - return uint64(len(s.RawBytes)) - } - return uint64(len(nullValue)) -} func (s *SQLTypeString) WriteToBuffer(bf *bytes.Buffer, escapeBackslash bool) { if s.RawBytes != nil { @@ -223,9 +220,6 @@ type SQLTypeBytes struct { func (s *SQLTypeBytes) BindAddress(arg []interface{}) { arg[0] = &s.RawBytes } -func (s *SQLTypeBytes) ReportSize() uint64 { - return uint64(len(s.RawBytes)) -} func (s *SQLTypeBytes) WriteToBuffer(bf *bytes.Buffer, _ bool) { if s.RawBytes != nil { diff --git a/dumpling/v4/export/test_util.go b/dumpling/v4/export/test_util.go index f4fee268..81791f83 100644 --- a/dumpling/v4/export/test_util.go +++ b/dumpling/v4/export/test_util.go @@ -86,6 +86,7 @@ type mockTableIR struct { colNames []string escapeBackSlash bool rowErr error + SQLRowIter } func (m *mockTableIR) Start(ctx context.Context, conn *sql.Conn) error { @@ -128,25 +129,27 @@ func (m *mockTableIR) SpecialComments() StringIter { } func (m *mockTableIR) Rows() SQLRowIter { - mockRows := sqlmock.NewRows(m.colTypes) - for _, datum := range m.data { - mockRows.AddRow(datum...) + if m.SQLRowIter == nil { + mockRows := sqlmock.NewRows(m.colTypes) + for _, datum := range m.data { + mockRows.AddRow(datum...) + } + db, mock, err := sqlmock.New() + if err != nil { + panic(fmt.Sprintf("sqlmock.New return error: %v", err)) + } + defer db.Close() + mock.ExpectQuery("select 1").WillReturnRows(mockRows) + if m.rowErr != nil { + mockRows.RowError(len(m.data)-1, m.rowErr) + } + rows, err := db.Query("select 1") + if err != nil { + panic(fmt.Sprintf("sqlmock.New return error: %v", err)) + } + m.SQLRowIter = newRowIter(rows, len(m.colTypes)) } - db, mock, err := sqlmock.New() - if err != nil { - panic(fmt.Sprintf("sqlmock.New return error: %v", err)) - } - defer db.Close() - mock.ExpectQuery("select 1").WillReturnRows(mockRows) - if m.rowErr != nil { - mockRows.RowError(len(m.data)-1, m.rowErr) - } - rows, err := db.Query("select 1") - if err != nil { - panic(fmt.Sprintf("sqlmock.New return error: %v", err)) - } - - return newRowIter(rows, len(m.colTypes)) + return m.SQLRowIter } func (m *mockTableIR) EscapeBackSlash() bool { @@ -161,5 +164,6 @@ func newMockTableIR(databaseName, tableName string, data [][]driver.Value, speci specCmt: specialComments, selectedField: "*", colTypes: colTypes, + SQLRowIter: nil, } } diff --git a/dumpling/v4/export/writer.go b/dumpling/v4/export/writer.go index b4f2dd4c..9bcaee49 100644 --- a/dumpling/v4/export/writer.go +++ b/dumpling/v4/export/writer.go @@ -64,13 +64,13 @@ func (f SQLWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { return err } fileName += ".sql" - chunksIter := buildChunksIter(ir, f.cfg.FileSize, f.cfg.StatementSize) + chunksIter := ir defer chunksIter.Rows().Close() for { filePath := path.Join(f.cfg.OutputDirPath, fileName) fileWriter, tearDown := buildInterceptFileWriter(filePath) - err := WriteInsert(ctx, chunksIter, fileWriter) + err := WriteInsert(ctx, chunksIter, fileWriter, f.cfg.FileSize, f.cfg.StatementSize) tearDown() if err != nil { return err @@ -152,7 +152,7 @@ func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { return err } fileName += ".csv" - chunksIter := buildChunksIter(ir, f.cfg.FileSize, f.cfg.StatementSize) + chunksIter := ir defer chunksIter.Rows().Close() opt := &csvOption{ @@ -164,7 +164,7 @@ func (f CSVWriter) WriteTableData(ctx context.Context, ir TableDataIR) error { for { filePath := path.Join(f.cfg.OutputDirPath, fileName) fileWriter, tearDown := buildInterceptFileWriter(filePath) - err := WriteInsertInCsv(ctx, chunksIter, fileWriter, f.cfg.NoHeader, opt) + err := WriteInsertInCsv(ctx, chunksIter, fileWriter, f.cfg.NoHeader, opt, f.cfg.FileSize) tearDown() if err != nil { return err diff --git a/dumpling/v4/export/writer_test.go b/dumpling/v4/export/writer_test.go index 9f918057..03ffd81b 100644 --- a/dumpling/v4/export/writer_test.go +++ b/dumpling/v4/export/writer_test.go @@ -110,6 +110,13 @@ func (s *testDumpSuite) TestWriteTableDataWithFileSize(c *C) { config.OutputDirPath = dir config.FileSize = 50 ctx := context.Background() + specCmts := []string{ + "/*!40101 SET NAMES binary*/;", + "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;", + } + config.FileSize += uint64(len(specCmts[0]) + 1) + config.FileSize += uint64(len(specCmts[1]) + 1) + config.FileSize += uint64(len("INSERT INTO `employees` VALUES\n")) simpleWriter, err := NewSimpleWriter(config) c.Assert(err, IsNil) @@ -122,10 +129,6 @@ func (s *testDumpSuite) TestWriteTableDataWithFileSize(c *C) { {"4", "female", "sarah@mail.com", "020-1235", "healthy"}, } colTypes := []string{"INT", "SET", "VARCHAR", "VARCHAR", "TEXT"} - specCmts := []string{ - "/*!40101 SET NAMES binary*/;", - "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;", - } tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes) err = writer.WriteTableData(ctx, tableIR) c.Assert(err, IsNil) @@ -160,6 +163,7 @@ func (s *testDumpSuite) TestWriteTableDataWithStatementSize(c *C) { config := DefaultConfig() config.OutputDirPath = dir config.StatementSize = 50 + config.StatementSize += uint64(len("INSERT INTO `employee` VALUES\n")) config.OutputFileTemplate, err = ParseOutputFileTemplate("specified-name") c.Assert(err, IsNil) ctx := context.Background() @@ -206,8 +210,11 @@ func (s *testDumpSuite) TestWriteTableDataWithStatementSize(c *C) { } // with file size and statement size - config.FileSize = 90 - config.StatementSize = 30 + config.FileSize = 204 + config.StatementSize = 95 + config.FileSize += uint64(len(specCmts[0]) + 1) + config.FileSize += uint64(len(specCmts[1]) + 1) + config.StatementSize += uint64(len("INSERT INTO `employee` VALUES\n")) // test specifying filename format config.OutputFileTemplate, err = ParseOutputFileTemplate("{{.Index}}-{{.Table}}-{{fn .DB}}") c.Assert(err, IsNil) @@ -230,6 +237,7 @@ func (s *testDumpSuite) TestWriteTableDataWithStatementSize(c *C) { "(4,'female','sarah@mail.com','020-1235','healthy');\n", } + tableIR = newMockTableIR("te%/st", "employee", data, specCmts, colTypes) c.Assert(writer.WriteTableData(ctx, tableIR), IsNil) c.Assert(err, IsNil) for p, expected := range cases { diff --git a/dumpling/v4/export/writer_util.go b/dumpling/v4/export/writer_util.go index 020c6c37..a27f4355 100644 --- a/dumpling/v4/export/writer_util.go +++ b/dumpling/v4/export/writer_util.go @@ -25,15 +25,26 @@ type writerPipe struct { closed chan struct{} errCh chan error + currentFileSize uint64 + currentStatementSize uint64 + + fileSizeLimit uint64 + statementSizeLimit uint64 + w io.Writer } -func newWriterPipe(w io.Writer) *writerPipe { +func newWriterPipe(w io.Writer, fileSizeLimit, statementSizeLimit uint64) *writerPipe { return &writerPipe{ input: make(chan *bytes.Buffer, 8), closed: make(chan struct{}), errCh: make(chan error, 1), w: w, + + currentFileSize: 0, + currentStatementSize: 0, + fileSizeLimit: fileSizeLimit, + statementSizeLimit: statementSizeLimit, } } @@ -62,6 +73,11 @@ func (b *writerPipe) Run(ctx context.Context) { } } +func (b *writerPipe) AddFileSize(fileSize uint64) { + b.currentFileSize += fileSize + b.currentStatementSize += fileSize +} + func (b *writerPipe) Error() error { select { case err := <-b.errCh: @@ -71,6 +87,15 @@ func (b *writerPipe) Error() error { } } +func (b *writerPipe) ShouldSwitchFile() bool { + return b.fileSizeLimit != UnspecifiedSize && b.currentFileSize >= b.fileSizeLimit +} + +func (b *writerPipe) ShouldSwitchStatement() bool { + return (b.fileSizeLimit != UnspecifiedSize && b.currentFileSize >= b.fileSizeLimit) || + (b.statementSizeLimit != UnspecifiedSize && b.currentStatementSize >= b.statementSizeLimit) +} + func WriteMeta(meta MetaIR, w io.StringWriter) error { log.Debug("start dumping meta data", zap.String("target", meta.TargetName())) @@ -89,7 +114,7 @@ func WriteMeta(meta MetaIR, w io.StringWriter) error { return nil } -func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer) error { +func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer, fileSizeLimit, statementSizeLimit uint64) error { fileRowIter := tblIR.Rows() if !fileRowIter.HasNext() { return nil @@ -100,7 +125,7 @@ func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer) error { bf.Grow(lengthLimit - bfCap) } - wp := newWriterPipe(w) + wp := newWriterPipe(w, fileSizeLimit, statementSizeLimit) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -119,6 +144,7 @@ func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer) error { bf.WriteString(specCmtIter.Next()) bf.WriteByte('\n') } + wp.currentFileSize += uint64(bf.Len()) var ( insertStatementPrefix string @@ -137,22 +163,27 @@ func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer) error { insertStatementPrefix = fmt.Sprintf("INSERT INTO %s VALUES\n", wrapBackTicks(escapeString(tblIR.TableName()))) } + insertStatementPrefixLen := uint64(len(insertStatementPrefix)) - for fileRowIter.HasNextSQLRowIter() { + for fileRowIter.HasNext() { + wp.currentStatementSize = 0 bf.WriteString(insertStatementPrefix) + wp.AddFileSize(insertStatementPrefixLen) - fileRowIter = fileRowIter.NextSQLRowIter() for fileRowIter.HasNext() { if err = fileRowIter.Decode(row); err != nil { log.Error("scanning from sql.Row failed", zap.Error(err)) return err } + lastBfSize := bf.Len() row.WriteToBuffer(bf, escapeBackSlash) counter += 1 + wp.AddFileSize(uint64(bf.Len()-lastBfSize) + 2) // 2 is for ",\n" and ";\n" fileRowIter.Next() - if fileRowIter.HasNext() { + shouldSwitch := wp.ShouldSwitchStatement() + if fileRowIter.HasNext() && !shouldSwitch { bf.WriteString(",\n") } else { bf.WriteString(";\n") @@ -172,6 +203,13 @@ func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer) error { return err default: } + + if shouldSwitch { + break + } + } + if wp.ShouldSwitchFile() { + break } } log.Debug("dumping table", @@ -188,7 +226,7 @@ func WriteInsert(pCtx context.Context, tblIR TableDataIR, w io.Writer) error { return wp.Error() } -func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w io.Writer, noHeader bool, opt *csvOption) error { +func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w io.Writer, noHeader bool, opt *csvOption, fileSizeLimit uint64) error { fileRowIter := tblIR.Rows() if !fileRowIter.HasNext() { return nil @@ -199,7 +237,7 @@ func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w io.Writer, noHe bf.Grow(lengthLimit - bfCap) } - wp := newWriterPipe(w) + wp := newWriterPipe(w, fileSizeLimit, UnspecifiedSize) ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -231,36 +269,38 @@ func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w io.Writer, noHe } bf.WriteByte('\n') } + wp.currentFileSize += uint64(bf.Len()) - for fileRowIter.HasNextSQLRowIter() { - fileRowIter = fileRowIter.NextSQLRowIter() - for fileRowIter.HasNext() { - if err = fileRowIter.Decode(row); err != nil { - log.Error("scanning from sql.Row failed", zap.Error(err)) - return err - } + for fileRowIter.HasNext() { + if err = fileRowIter.Decode(row); err != nil { + log.Error("scanning from sql.Row failed", zap.Error(err)) + return err + } - row.WriteToBufferInCsv(bf, escapeBackSlash, opt) - counter += 1 + lastBfSize := bf.Len() + row.WriteToBufferInCsv(bf, escapeBackSlash, opt) + counter += 1 + wp.currentFileSize += uint64(bf.Len()-lastBfSize) + 1 // 1 is for "\n" - if bf.Len() >= lengthLimit { - wp.input <- bf - bf = pool.Get().(*bytes.Buffer) - if bfCap := bf.Cap(); bfCap < lengthLimit { - bf.Grow(lengthLimit - bfCap) - } + bf.WriteByte('\n') + if bf.Len() >= lengthLimit { + wp.input <- bf + bf = pool.Get().(*bytes.Buffer) + if bfCap := bf.Cap(); bfCap < lengthLimit { + bf.Grow(lengthLimit - bfCap) } + } - fileRowIter.Next() - bf.WriteByte('\n') - - select { - case <-pCtx.Done(): - return pCtx.Err() - case err := <-wp.errCh: - return err - default: - } + fileRowIter.Next() + select { + case <-pCtx.Done(): + return pCtx.Err() + case err := <-wp.errCh: + return err + default: + } + if wp.ShouldSwitchFile() { + break } } diff --git a/dumpling/v4/export/writer_util_test.go b/dumpling/v4/export/writer_util_test.go index 0b357d13..927819fe 100644 --- a/dumpling/v4/export/writer_util_test.go +++ b/dumpling/v4/export/writer_util_test.go @@ -61,7 +61,7 @@ func (s *testUtilSuite) TestWriteInsert(c *C) { tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes) bf := &bytes.Buffer{} - err := WriteInsert(context.Background(), tableIR, bf) + err := WriteInsert(context.Background(), tableIR, bf, UnspecifiedSize, UnspecifiedSize) c.Assert(err, IsNil) expected := "/*!40101 SET NAMES binary*/;\n" + "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n" + @@ -91,7 +91,7 @@ func (s *testUtilSuite) TestWriteInsertReturnsError(c *C) { tableIR.rowErr = rowErr bf := &bytes.Buffer{} - err := WriteInsert(context.Background(), tableIR, bf) + err := WriteInsert(context.Background(), tableIR, bf, UnspecifiedSize, UnspecifiedSize) c.Assert(err, Equals, rowErr) expected := "/*!40101 SET NAMES binary*/;\n" + "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;\n" + @@ -115,7 +115,7 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { // test nullValue opt := &csvOption{separator: []byte(","), delimiter: doubleQuotationMark, nullValue: "\\N"} - err := WriteInsertInCsv(context.Background(), tableIR, bf, true, opt) + err := WriteInsertInCsv(context.Background(), tableIR, bf, true, opt, UnspecifiedSize) c.Assert(err, IsNil) expected := "1,\"male\",\"bob@mail.com\",\"020-1234\",\\N\n" + "2,\"female\",\"sarah@mail.com\",\"020-1253\",\"healthy\"\n" + @@ -126,7 +126,8 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { // test delimiter bf.Reset() opt.delimiter = quotationMark - err = WriteInsertInCsv(context.Background(), tableIR, bf, true, opt) + tableIR = newMockTableIR("test", "employee", data, nil, colTypes) + err = WriteInsertInCsv(context.Background(), tableIR, bf, true, opt, UnspecifiedSize) c.Assert(err, IsNil) expected = "1,'male','bob@mail.com','020-1234',\\N\n" + "2,'female','sarah@mail.com','020-1253','healthy'\n" + @@ -137,7 +138,8 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { // test separator bf.Reset() opt.separator = []byte(";") - err = WriteInsertInCsv(context.Background(), tableIR, bf, true, opt) + tableIR = newMockTableIR("test", "employee", data, nil, colTypes) + err = WriteInsertInCsv(context.Background(), tableIR, bf, true, opt, UnspecifiedSize) c.Assert(err, IsNil) expected = "1;'male';'bob@mail.com';'020-1234';\\N\n" + "2;'female';'sarah@mail.com';'020-1253';'healthy'\n" + @@ -149,8 +151,9 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { bf.Reset() opt.separator = []byte("&;,?") opt.delimiter = []byte("ma") + tableIR = newMockTableIR("test", "employee", data, nil, colTypes) tableIR.colNames = []string{"id", "gender", "email", "phone_number", "status"} - err = WriteInsertInCsv(context.Background(), tableIR, bf, false, opt) + err = WriteInsertInCsv(context.Background(), tableIR, bf, false, opt, UnspecifiedSize) c.Assert(err, IsNil) expected = "maidma&;,?magenderma&;,?maemamailma&;,?maphone_numberma&;,?mastatusma\n" + "1&;,?mamamalema&;,?mabob@mamail.comma&;,?ma020-1234ma&;,?\\N\n" + @@ -175,7 +178,7 @@ func (s *testUtilSuite) TestSQLDataTypes(c *C) { tableIR := newMockTableIR("test", "t", tableData, nil, colType) bf := &bytes.Buffer{} - err := WriteInsert(context.Background(), tableIR, bf) + err := WriteInsert(context.Background(), tableIR, bf, UnspecifiedSize, UnspecifiedSize) c.Assert(err, IsNil) lines := strings.Split(bf.String(), "\n") c.Assert(len(lines), Equals, 3)