From 0801cbd489241e03fcef8a38d23b37dab8a7de2e Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Thu, 14 Sep 2023 17:25:46 +0800 Subject: [PATCH 1/9] a way to estimate parquet file size Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/loader.go | 61 ++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index 8c4232d828f58..fc6b9be0761c7 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -33,7 +33,11 @@ import ( ) // sampleCompressedFileSize represents how many bytes need to be sampled for compressed files -const sampleCompressedFileSize = 4 * 1024 +const ( + sampleCompressedFileSize = 4 * 1024 + maxSampleParquetDataSize = 8 * 1024 + maxSampleParquetRowCount = 500 +) // MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader. type MDDatabaseMeta struct { @@ -473,7 +477,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size s.tableSchemas = append(s.tableSchemas, info) case SourceTypeViewSchema: s.viewSchemas = append(s.viewSchemas, info) - case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet: + case SourceTypeSQL, SourceTypeCSV: if info.FileMeta.Compression != CompressionNone { compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore()) if err2 != nil { @@ -484,6 +488,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } } s.tableDatas = append(s.tableDatas, info) + case SourceTypeParquet: + parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore()) + if err2 != nil { + logger.Error("[loader] fail to sample parquet data size", + zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) + } else { + info.FileMeta.RealSize = parquestDataSize + } + s.tableDatas = append(s.tableDatas, info) } logger.Debug("file route result", zap.String("schema", res.Schema), @@ -765,3 +778,47 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store } return float64(tot) / float64(pos), nil } + +// SampleParquetDataSize samples the data size of the parquet file. +func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) { + totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta) + if err != nil { + return 0, err + } + + reader, err := store.Open(ctx, fileMeta.Path) + if err != nil { + return 0, err + } + defer reader.Close() + parser, err := NewParquetParser(ctx, store, reader, fileMeta.Path) + if err != nil { + return 0, err + } + //nolint: errcheck + defer parser.Close() + + var ( + rowSize int64 + rowCount int64 + ) + for { + err = parser.ReadRow() + if err != nil { + if errors.Cause(err) == io.EOF { + break + } + return 0, err + } + lastRow := parser.LastRow() + rowCount += 1 + rowSize += int64(lastRow.Length) + parser.RecycleRow(lastRow) + if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount { + break + + } + } + size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize)) + return size, nil +} From 30bbd7b2204c5d53b79f68090d94a1e6536d38ac Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Thu, 14 Sep 2023 18:38:08 +0800 Subject: [PATCH 2/9] add ut Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/loader_test.go | 48 ++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/br/pkg/lightning/mydump/loader_test.go b/br/pkg/lightning/mydump/loader_test.go index 30928ac06e1d1..53beaf8b12e67 100644 --- a/br/pkg/lightning/mydump/loader_test.go +++ b/br/pkg/lightning/mydump/loader_test.go @@ -32,6 +32,8 @@ import ( router "github.com/pingcap/tidb/util/table-router" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/writer" ) type testMydumpLoaderSuite struct { @@ -1103,3 +1105,49 @@ func TestSampleFileCompressRatio(t *testing.T) { require.NoError(t, err) require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5) } + +func TestSampleParquetDataSize(t *testing.T) { + s := newTestMydumpLoaderSuite(t) + store, err := storage.NewLocalStorage(s.sourceDir) + require.NoError(t, err) + + type row struct { + ID int64 `parquet:"name=id, type=INT64"` + Name string `parquet:"name=name, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + byteArray := make([]byte, 0, 40*1024) + bf := bytes.NewBuffer(byteArray) + pwriter, err := writer.NewParquetWriterFromWriter(bf, new(row), 4) + require.NoError(t, err) + pwriter.RowGroupSize = 128 * 1024 * 1024 //128M + pwriter.PageSize = 8 * 1024 //8K + pwriter.CompressionType = parquet.CompressionCodec_SNAPPY + for i := 0; i < 2000; i++ { + row := row{ + ID: int64(i), + Name: fmt.Sprintf("row_name_%04d", i), + } + err = pwriter.Write(row) + require.NoError(t, err) + } + err = pwriter.WriteStop() + require.NoError(t, err) + + fileName := "test_1.t1.parquet" + err = store.WriteFile(ctx, fileName, bf.Bytes()) + require.NoError(t, err) + + size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{ + Path: fileName, + }, store) + require.NoError(t, err) + // + // 42000 = 2000 rows * 21 bytes per row + // expected error within 10%, so delta = 42000 * 0.1 = 4200 + // + require.InDelta(t, 42000, size, 4200) +} From 207a4e3583cab2ff193239e06e49031112aed4e3 Mon Sep 17 00:00:00 2001 From: zzm Date: Thu, 14 Sep 2023 21:27:23 +0800 Subject: [PATCH 3/9] Update br/pkg/lightning/mydump/loader.go Co-authored-by: D3Hunter --- br/pkg/lightning/mydump/loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index fc6b9be0761c7..f4dd1dcdcdf6c 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -491,7 +491,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size case SourceTypeParquet: parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore()) if err2 != nil { - logger.Error("[loader] fail to sample parquet data size", + logger.Error("fail to sample parquet data size", zap.String("category", "loader"), zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) } else { info.FileMeta.RealSize = parquestDataSize From 0f39d065dfa0b7c3c88ece7b17c85387820cd742 Mon Sep 17 00:00:00 2001 From: zzm Date: Thu, 14 Sep 2023 21:27:55 +0800 Subject: [PATCH 4/9] Update br/pkg/lightning/mydump/loader.go Co-authored-by: D3Hunter --- br/pkg/lightning/mydump/loader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index f4dd1dcdcdf6c..e8dc8ec30f24a 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -816,7 +816,6 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s parser.RecycleRow(lastRow) if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount { break - } } size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize)) From 9e3c683649bd46c7054aaa8efefb288121b54224 Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Thu, 14 Sep 2023 21:36:18 +0800 Subject: [PATCH 5/9] close reader by parquet parse Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/loader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index e8dc8ec30f24a..92c2160ac72ac 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -790,9 +790,10 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s if err != nil { return 0, err } - defer reader.Close() parser, err := NewParquetParser(ctx, store, reader, fileMeta.Path) if err != nil { + //nolint: errcheck + reader.Close() return 0, err } //nolint: errcheck From 2ffe45efbd4888fc4c0301e4cfbf6f5f97c9e1ee Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Tue, 19 Sep 2023 10:08:27 +0800 Subject: [PATCH 6/9] make check Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/mydump/BUILD.bazel b/br/pkg/lightning/mydump/BUILD.bazel index 0e37e8c7525ea..ad868651261bb 100644 --- a/br/pkg/lightning/mydump/BUILD.bazel +++ b/br/pkg/lightning/mydump/BUILD.bazel @@ -85,6 +85,7 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_xitongsys_parquet_go//parquet", "@com_github_xitongsys_parquet_go//writer", "@com_github_xitongsys_parquet_go_source//local", "@org_uber_go_goleak//:goleak", From 0b9b154db8b513fd7c2d56ae196b39928220638b Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Wed, 20 Sep 2023 16:35:20 +0800 Subject: [PATCH 7/9] gen test data random Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/loader_test.go | 34 ++++++++++++++++++-------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/mydump/loader_test.go b/br/pkg/lightning/mydump/loader_test.go index 53beaf8b12e67..1c32d310f47c2 100644 --- a/br/pkg/lightning/mydump/loader_test.go +++ b/br/pkg/lightning/mydump/loader_test.go @@ -19,9 +19,11 @@ import ( "compress/gzip" "context" "fmt" + "math/rand" "os" "path/filepath" "testing" + "time" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -1112,8 +1114,9 @@ func TestSampleParquetDataSize(t *testing.T) { require.NoError(t, err) type row struct { - ID int64 `parquet:"name=id, type=INT64"` - Name string `parquet:"name=name, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` + ID int64 `parquet:"name=id, type=INT64"` + Key string `parquet:"name=key, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` + Value string `parquet:"name=value, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` } ctx, cancel := context.WithCancel(context.Background()) @@ -1126,10 +1129,24 @@ func TestSampleParquetDataSize(t *testing.T) { pwriter.RowGroupSize = 128 * 1024 * 1024 //128M pwriter.PageSize = 8 * 1024 //8K pwriter.CompressionType = parquet.CompressionCodec_SNAPPY - for i := 0; i < 2000; i++ { + seed := time.Now().Unix() + rand.Seed(seed) + totalRowSize := 0 + for i := 0; i < 1000; i++ { + kl := rand.Intn(20) + 1 + key := make([]byte, kl) + kl, err = rand.Read(key) + require.NoError(t, err) + vl := rand.Intn(20) + 1 + value := make([]byte, vl) + vl, err = rand.Read(value) + require.NoError(t, err) + + totalRowSize += kl + vl + 8 row := row{ - ID: int64(i), - Name: fmt.Sprintf("row_name_%04d", i), + ID: int64(i), + Key: string(key[:kl]), + Value: string(value[:vl]), } err = pwriter.Write(row) require.NoError(t, err) @@ -1145,9 +1162,6 @@ func TestSampleParquetDataSize(t *testing.T) { Path: fileName, }, store) require.NoError(t, err) - // - // 42000 = 2000 rows * 21 bytes per row - // expected error within 10%, so delta = 42000 * 0.1 = 4200 - // - require.InDelta(t, 42000, size, 4200) + // expected error within 10%, so delta = totalRowSize / 10 + require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10) } From c80504a4b91b138b9754b4eb49d528a1209cda8c Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Wed, 20 Sep 2023 17:13:16 +0800 Subject: [PATCH 8/9] t.Logf seed Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/loader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/mydump/loader_test.go b/br/pkg/lightning/mydump/loader_test.go index 1c32d310f47c2..69c3474d4cd1d 100644 --- a/br/pkg/lightning/mydump/loader_test.go +++ b/br/pkg/lightning/mydump/loader_test.go @@ -1130,6 +1130,7 @@ func TestSampleParquetDataSize(t *testing.T) { pwriter.PageSize = 8 * 1024 //8K pwriter.CompressionType = parquet.CompressionCodec_SNAPPY seed := time.Now().Unix() + t.Logf("seed: %d", seed) rand.Seed(seed) totalRowSize := 0 for i := 0; i < 1000; i++ { From 65854306c83e92b3397b47880f1e758e82686dd3 Mon Sep 17 00:00:00 2001 From: zeminzhou Date: Thu, 21 Sep 2023 14:37:28 +0800 Subject: [PATCH 9/9] make lint happy Signed-off-by: zeminzhou --- br/pkg/lightning/mydump/loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index 92c2160ac72ac..89b0d47326633 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -812,7 +812,7 @@ func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store s return 0, err } lastRow := parser.LastRow() - rowCount += 1 + rowCount++ rowSize += int64(lastRow.Length) parser.RecycleRow(lastRow) if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount {