Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: sample once parquet file (#56205) #57446

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,19 +1488,8 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
allTasks = append(allTasks, task{tr: tr, cp: cp})

if len(cp.Engines) == 0 {
for i, fi := range tableMeta.DataFiles {
for _, fi := range tableMeta.DataFiles {
totalDataSizeToRestore += fi.FileMeta.FileSize
if fi.FileMeta.Type == mydump.SourceTypeParquet {
numberRows, err := mydump.ReadParquetFileRowCountByFile(ctx, rc.store, fi.FileMeta)
if err != nil {
return errors.Trace(err)
}
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(numberRows))
}
fi.FileMeta.Rows = numberRows
tableMeta.DataFiles[i] = fi
}
}
} else {
for _, eng := range cp.Engines {
Expand Down
41 changes: 31 additions & 10 deletions pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/metric"
regexprrouter "github.com/pingcap/tidb/pkg/util/regexpr-router"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"go.uber.org/zap"
Expand Down Expand Up @@ -244,6 +245,8 @@ type mdLoaderSetup struct {
dbIndexMap map[string]int
tableIndexMap map[filter.Table]int
setupCfg *MDLoaderSetupConfig

sampledParquetRowSizes map[string]float64
}

// NewLoader constructs a MyDumper loader that scanns the data source and constructs a set of metadatas.
Expand Down Expand Up @@ -320,6 +323,8 @@ func NewLoaderWithStore(ctx context.Context, cfg LoaderConfig,
dbIndexMap: make(map[string]int),
tableIndexMap: make(map[filter.Table]int),
setupCfg: mdLoaderSetupCfg,

sampledParquetRowSizes: make(map[string]float64),
}

if err := setup.setup(ctx); err != nil {
Expand Down Expand Up @@ -532,12 +537,29 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
}
s.tableDatas = append(s.tableDatas, info)
case SourceTypeParquet:
parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("fail to sample parquet data size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2))
} else {
info.FileMeta.RealSize = parquestDataSize
tableName := info.TableName.String()
if s.sampledParquetRowSizes[tableName] == 0 {
s.sampledParquetRowSizes[tableName], err = SampleParquetRowSize(ctx, info.FileMeta, s.loader.GetStore())
if err != nil {
logger.Error("fail to sample parquet row size", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name),
zap.Stringer("type", res.Type), zap.Error(err))
return errors.Trace(err)
}
}
if s.sampledParquetRowSizes[tableName] != 0 {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, s.loader.GetStore(), info.FileMeta)
if err != nil {
logger.Error("fail to get file total row count", zap.String("category", "loader"),
zap.String("schema", res.Schema), zap.String("table", res.Name),
zap.Stringer("type", res.Type), zap.Error(err))
return errors.Trace(err)
}
info.FileMeta.RealSize = int64(float64(totalRowCount) * s.sampledParquetRowSizes[tableName])
info.FileMeta.Rows = totalRowCount
if m, ok := metric.FromContext(ctx); ok {
m.RowsCounter.WithLabelValues(metric.StateTotalRestore, tableName).Add(float64(totalRowCount))
}
}
s.tableDatas = append(s.tableDatas, info)
}
Expand Down Expand Up @@ -823,8 +845,8 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store
return float64(tot) / float64(pos), nil
}

// SampleParquetDataSize samples the data size of the parquet file.
func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) {
// SampleParquetRowSize samples row size of the parquet file.
func SampleParquetRowSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta)
if totalRowCount == 0 || err != nil {
return 0, err
Expand Down Expand Up @@ -863,6 +885,5 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s
break
}
}
size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize))
return size, nil
return float64(rowSize) / float64(rowCount), nil
}
8 changes: 6 additions & 2 deletions pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,12 +1159,16 @@ func testSampleParquetDataSize(t *testing.T, count int) {
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{
rowSize, err := md.SampleParquetRowSize(ctx, md.SourceFileMeta{
Path: fileName,
}, store)
require.NoError(t, err)
rowCount, err := md.ReadParquetFileRowCountByFile(ctx, store, md.SourceFileMeta{
Path: fileName,
})
require.NoError(t, err)
// expected error within 10%, so delta = totalRowSize / 10
require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10)
require.InDelta(t, totalRowSize, int64(rowSize*float64(rowCount)), float64(totalRowSize)/10)
}

func TestSampleParquetDataSize(t *testing.T) {
Expand Down