From 63599b38dfb53ce6a2f850691de4f17c7dfd7df5 Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Sun, 22 Sep 2024 19:55:21 +0800 Subject: [PATCH 1/9] sample once Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 25 +++++++++++++++++-------- pkg/lightning/mydump/loader_test.go | 9 +++++++-- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index d2fc407ddcdeb..56096a81127bc 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -244,6 +244,8 @@ type mdLoaderSetup struct { dbIndexMap map[string]int tableIndexMap map[filter.Table]int setupCfg *MDLoaderSetupConfig + + sampledParquetRowSize float64 } // NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas. @@ -532,12 +534,20 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } s.tableDatas = append(s.tableDatas, info) case SourceTypeParquet: - parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore()) + if s.sampledParquetRowSize == 0 { + s.sampledParquetRowSize, err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore()) + if err != nil { + logger.Error("fail to sample parquet row size", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err)) + return err + } + } + totalRowCount, err2 := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) if err2 != nil { - logger.Error("fail to sample parquet data size", zap.String("category", "loader"), + logger.Error("fail to get file total row count", zap.String("category", "loader"), zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) } else { - info.FileMeta.RealSize = parquestDataSize + info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSize) } s.tableDatas = append(s.tableDatas, info) } @@ -823,8 +833,8 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store return float64(tot) / float64(pos), nil } -// SampleParquetDataSize samples the data size of the parquet file. -func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) { +// SampleParquetRowSize samples one row size of the parquet file. +func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta) if totalRowCount == 0 || err != nil { return 0, err @@ -856,13 +866,12 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s return 0, err } lastRow := parser.LastRow() - rowCount++ + rowCount += 1 rowSize += int64(lastRow.Length) parser.RecycleRow(lastRow) if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount { break } } - size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize)) - return size, nil + return float64(rowSize) / float64(rowCount), nil } diff --git a/pkg/lightning/mydump/loader_test.go b/pkg/lightning/mydump/loader_test.go index 0734371b85a4e..dbe60b0cf8c4b 100644 --- a/pkg/lightning/mydump/loader_test.go +++ b/pkg/lightning/mydump/loader_test.go @@ -1159,12 +1159,17 @@ func testSampleParquetDataSize(t *testing.T, count int) { err = store.WriteFile(ctx, fileName, bf.Bytes()) require.NoError(t, err) - size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{ + rowSize, err := md.SampleParquetRowSize(ctx, md.SourceFileMeta{ Path: fileName, }, store) require.NoError(t, err) + rowCount, err := md.ReadParquetFileRowCountByFile(ctx, store, md.SourceFileMeta{ + Path: fileName, + }) + require.NoError(t, err) + require.NoError(t, err) // expected error within 10%, so delta = totalRowSize / 10 - require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10) + require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10) } func TestSampleParquetDataSize(t *testing.T) { From 4e777fe65e04f601ff20ecfb963ec27d88cba48f Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Tue, 24 Sep 2024 14:06:02 +0800 Subject: [PATCH 2/9] fix bazel_build Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index 56096a81127bc..be3889af8f7e4 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -866,7 +866,7 @@ func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store st return 0, err } lastRow := parser.LastRow() - rowCount += 1 + rowCount++ rowSize += int64(lastRow.Length) parser.RecycleRow(lastRow) if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount { From cd176c455b2fdf3ff02687949d648ffcd1b693b1 Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Tue, 24 Sep 2024 14:11:11 +0800 Subject: [PATCH 3/9] don't return error Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index be3889af8f7e4..dc7f37aa94057 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -538,16 +538,19 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size s.sampledParquetRowSize, err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore()) if err != nil { logger.Error("fail to sample parquet row size", zap.String("category", "loader"), - zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err)) - return err + zap.String("schema", res.Schema), zap.String("table", res.Name), + zap.Stringer("type", res.Type), zap.Error(err)) } } - totalRowCount, err2 := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) - if err2 != nil { - logger.Error("fail to get file total row count", zap.String("category", "loader"), - zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) - } else { - info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSize) + if s.sampledParquetRowSize != 0 { + totalRowCount, err2 := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) + if err2 != nil { + logger.Error("fail to get file total row count", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), + zap.Stringer("type", res.Type), zap.Error(err2)) + } else { + info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSize) + } } s.tableDatas = append(s.tableDatas, info) } From 8e153dad6c1b00fb4f0ac01742d52af75b6ab94c Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Tue, 24 Sep 2024 14:33:56 +0800 Subject: [PATCH 4/9] remove unused changes Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/lightning/mydump/loader_test.go b/pkg/lightning/mydump/loader_test.go index dbe60b0cf8c4b..928b842725309 100644 --- a/pkg/lightning/mydump/loader_test.go +++ b/pkg/lightning/mydump/loader_test.go @@ -1167,7 +1167,6 @@ func testSampleParquetDataSize(t *testing.T, count int) { Path: fileName, }) require.NoError(t, err) - require.NoError(t, err) // expected error within 10%, so delta = totalRowSize / 10 require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10) } From f3014014c51ce2e34df489e07afcc32bb842040e Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Wed, 25 Sep 2024 20:17:13 +0800 Subject: [PATCH 5/9] sample every table Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index dc7f37aa94057..61c7e00b7a089 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -245,7 +245,7 @@ type mdLoaderSetup struct { tableIndexMap map[filter.Table]int setupCfg *MDLoaderSetupConfig - sampledParquetRowSize float64 + sampledParquetRowSizes map[string]float64 } // NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas. @@ -322,6 +322,8 @@ func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig, dbIndexMap: make(map[string]int), tableIndexMap: make(map[filter.Table]int), setupCfg: mdLoaderSetupCfg, + + sampledParquetRowSizes: make(map[string]float64), } if err := setup.setup(ctx); err != nil { @@ -534,22 +536,23 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } s.tableDatas = append(s.tableDatas, info) case SourceTypeParquet: - if s.sampledParquetRowSize == 0 { - s.sampledParquetRowSize, err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore()) + tableName := info.TableName.String() + if s.sampledParquetRowSizes[tableName] == 0 { + s.sampledParquetRowSizes[tableName], err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore()) if err != nil { logger.Error("fail to sample parquet row size", zap.String("category", "loader"), zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err)) } } - if s.sampledParquetRowSize != 0 { + if s.sampledParquetRowSizes[tableName] != 0 { totalRowCount, err2 := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) if err2 != nil { logger.Error("fail to get file total row count", zap.String("category", "loader"), zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) } else { - info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSize) + info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) } } s.tableDatas = append(s.tableDatas, info) From 4d45ec62af3c1f7433d8fa833b414cf1298a0e29 Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Fri, 11 Oct 2024 18:50:19 +0800 Subject: [PATCH 6/9] fix comment Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index 61c7e00b7a089..65a0fa33b13ab 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -839,7 +839,7 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store return float64(tot) / float64(pos), nil } -// SampleParquetRowSize samples one row size of the parquet file. +// SampleParquetRowSize samples row size of the parquet file. func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta) if totalRowCount == 0 || err != nil { From ee7d2eb83ad77bbe9c16805463d6c99a894a81a5 Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Mon, 21 Oct 2024 17:15:32 +0800 Subject: [PATCH 7/9] call ReadParquetFileRowCountByFile once Signed-off-by: zeminzhou --- lightning/pkg/importer/import.go | 13 +------------ pkg/lightning/mydump/loader.go | 9 ++++++--- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 512aa277d6b34..ed91105bf1311 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -1488,19 +1488,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { allTasks = append(allTasks, task{tr: tr, cp: cp}) if len(cp.Engines) == 0 { - for i, fi := range tableMeta.DataFiles { + for _, fi := range tableMeta.DataFiles { totalDataSizeToRestore += fi.FileMeta.FileSize - if fi.FileMeta.Type == mydump.SourceTypeParquet { - numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta) - if err != nil { - return errors.Trace(err) - } - if m, ok := metric.FromContext(ctx); ok { - m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows)) - } - fi.FileMeta.Rows = numberRows - tableMeta.DataFiles[i] = fi - } } } else { for _, eng := range cp.Engines { diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index 65a0fa33b13ab..cf6dbae677372 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -543,16 +543,19 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size logger.Error("fail to sample parquet row size", zap.String("category", "loader"), zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err)) + return errors.Trace(err) } } if s.sampledParquetRowSizes[tableName] != 0 { - totalRowCount, err2 := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) - if err2 != nil { + totalRowCount, err := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) + if err != nil { logger.Error("fail to get file total row count", zap.String("category", "loader"), zap.String("schema", res.Schema), zap.String("table", res.Name), - zap.Stringer("type", res.Type), zap.Error(err2)) + zap.Stringer("type", res.Type), zap.Error(err)) + return errors.Trace(err) } else { info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) + info.FileMeta.Rows = totalRowCount } } s.tableDatas = append(s.tableDatas, info) From 4704f38b9eb353f8438f201fcbd484683ef6ecdf Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Wed, 13 Nov 2024 17:00:23 +0800 Subject: [PATCH 8/9] fix comment Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index cf6dbae677372..bad433a7d2b9e 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/lightning/metric" regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router" filter "github.com/pingcap/tidb/pkg/util/table-filter" "go.uber.org/zap" @@ -556,6 +557,9 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } else { info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) info.FileMeta.Rows = totalRowCount + if m, ok := metric.FromContext(ctx); ok { + m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount)) + } } } s.tableDatas = append(s.tableDatas, info) From d975516a349300e346de039c52b53c3755f2c83a Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Wed, 13 Nov 2024 17:10:37 +0800 Subject: [PATCH 9/9] fix ci Signed-off-by: zeminzhou --- pkg/lightning/mydump/loader.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index bad433a7d2b9e..af747045a93d4 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -554,12 +554,11 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err)) return errors.Trace(err) - } else { - info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) - info.FileMeta.Rows = totalRowCount - if m, ok := metric.FromContext(ctx); ok { - m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount)) - } + } + info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) + info.FileMeta.Rows = totalRowCount + if m, ok := metric.FromContext(ctx); ok { + m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount)) } } s.tableDatas = append(s.tableDatas, info)