From 0a9a231c5e05a3e29421eb11570e7102eb6c2156 Mon Sep 17 00:00:00 2001 From: zzm Date: Mon, 18 Nov 2024 11:24:59 +0800 Subject: [PATCH] lightning: sample once parquet file (#56205) close pingcap/tidb#56104 --- lightning/pkg/importer/import.go | 13 +-------- pkg/lightning/mydump/loader.go | 41 ++++++++++++++++++++++------- pkg/lightning/mydump/loader_test.go | 8 ++++-- 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 512aa277d6b34..ed91105bf1311 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -1488,19 +1488,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { allTasks = append(allTasks, task{tr: tr, cp: cp}) if len(cp.Engines) == 0 { - for i, fi := range tableMeta.DataFiles { + for _, fi := range tableMeta.DataFiles { totalDataSizeToRestore += fi.FileMeta.FileSize - if fi.FileMeta.Type == mydump.SourceTypeParquet { - numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta) - if err != nil { - return errors.Trace(err) - } - if m, ok := metric.FromContext(ctx); ok { - m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows)) - } - fi.FileMeta.Rows = numberRows - tableMeta.DataFiles[i] = fi - } } } else { for _, eng := range cp.Engines { diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index d2fc407ddcdeb..af747045a93d4 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/config" "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/lightning/metric" regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router" filter "github.com/pingcap/tidb/pkg/util/table-filter" "go.uber.org/zap" @@ -244,6 +245,8 @@ type mdLoaderSetup struct { dbIndexMap map[string]int tableIndexMap map[filter.Table]int setupCfg *MDLoaderSetupConfig + + sampledParquetRowSizes map[string]float64 } // NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas. @@ -320,6 +323,8 @@ func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig, dbIndexMap: make(map[string]int), tableIndexMap: make(map[filter.Table]int), setupCfg: mdLoaderSetupCfg, + + sampledParquetRowSizes: make(map[string]float64), } if err := setup.setup(ctx); err != nil { @@ -532,12 +537,29 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } s.tableDatas = append(s.tableDatas, info) case SourceTypeParquet: - parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore()) - if err2 != nil { - logger.Error("fail to sample parquet data size", zap.String("category", "loader"), - zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) - } else { - info.FileMeta.RealSize = parquestDataSize + tableName := info.TableName.String() + if s.sampledParquetRowSizes[tableName] == 0 { + s.sampledParquetRowSizes[tableName], err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore()) + if err != nil { + logger.Error("fail to sample parquet row size", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), + zap.Stringer("type", res.Type), zap.Error(err)) + return errors.Trace(err) + } + } + if s.sampledParquetRowSizes[tableName] != 0 { + totalRowCount, err := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta) + if err != nil { + logger.Error("fail to get file total row count", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), + zap.Stringer("type", res.Type), zap.Error(err)) + return errors.Trace(err) + } + info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName]) + info.FileMeta.Rows = totalRowCount + if m, ok := metric.FromContext(ctx); ok { + m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount)) + } } s.tableDatas = append(s.tableDatas, info) } @@ -823,8 +845,8 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store return float64(tot) / float64(pos), nil } -// SampleParquetDataSize samples the data size of the parquet file. -func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) { +// SampleParquetRowSize samples row size of the parquet file. +func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta) if totalRowCount == 0 || err != nil { return 0, err @@ -863,6 +885,5 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s break } } - size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize)) - return size, nil + return float64(rowSize) / float64(rowCount), nil } diff --git a/pkg/lightning/mydump/loader_test.go b/pkg/lightning/mydump/loader_test.go index 0734371b85a4e..928b842725309 100644 --- a/pkg/lightning/mydump/loader_test.go +++ b/pkg/lightning/mydump/loader_test.go @@ -1159,12 +1159,16 @@ func testSampleParquetDataSize(t *testing.T, count int) { err = store.WriteFile(ctx, fileName, bf.Bytes()) require.NoError(t, err) - size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{ + rowSize, err := md.SampleParquetRowSize(ctx, md.SourceFileMeta{ Path: fileName, }, store) require.NoError(t, err) + rowCount, err := md.ReadParquetFileRowCountByFile(ctx, store, md.SourceFileMeta{ + Path: fileName, + }) + require.NoError(t, err) // expected error within 10%, so delta = totalRowSize / 10 - require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10) + require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10) } func TestSampleParquetDataSize(t *testing.T) {