diff --git a/br/pkg/lightning/mydump/BUILD.bazel b/br/pkg/lightning/mydump/BUILD.bazel index 0e37e8c7525ea..ad868651261bb 100644 --- a/br/pkg/lightning/mydump/BUILD.bazel +++ b/br/pkg/lightning/mydump/BUILD.bazel @@ -85,6 +85,7 @@ go_test( "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_xitongsys_parquet_go//parquet", "@com_github_xitongsys_parquet_go//writer", "@com_github_xitongsys_parquet_go_source//local", "@org_uber_go_goleak//:goleak", diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index 8c4232d828f58..89b0d47326633 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -33,7 +33,11 @@ import ( ) // sampleCompressedFileSize represents how many bytes need to be sampled for compressed files -const sampleCompressedFileSize = 4 * 1024 +const ( + sampleCompressedFileSize = 4 * 1024 + maxSampleParquetDataSize = 8 * 1024 + maxSampleParquetRowCount = 500 +) // MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader. type MDDatabaseMeta struct { @@ -473,7 +477,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size s.tableSchemas = append(s.tableSchemas, info) case SourceTypeViewSchema: s.viewSchemas = append(s.viewSchemas, info) - case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet: + case SourceTypeSQL, SourceTypeCSV: if info.FileMeta.Compression != CompressionNone { compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore()) if err2 != nil { @@ -484,6 +488,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size } } s.tableDatas = append(s.tableDatas, info) + case SourceTypeParquet: + parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore()) + if err2 != nil { + logger.Error("fail to sample parquet data size", zap.String("category", "loader"), + zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2)) + } else { + info.FileMeta.RealSize = parquestDataSize + } + s.tableDatas = append(s.tableDatas, info) } logger.Debug("file route result", zap.String("schema", res.Schema), @@ -765,3 +778,47 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store } return float64(tot) / float64(pos), nil } + +// SampleParquetDataSize samples the data size of the parquet file. +func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) { + totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta) + if err != nil { + return 0, err + } + + reader, err := store.Open(ctx, fileMeta.Path) + if err != nil { + return 0, err + } + parser, err := NewParquetParser(ctx, store, reader, fileMeta.Path) + if err != nil { + //nolint: errcheck + reader.Close() + return 0, err + } + //nolint: errcheck + defer parser.Close() + + var ( + rowSize int64 + rowCount int64 + ) + for { + err = parser.ReadRow() + if err != nil { + if errors.Cause(err) == io.EOF { + break + } + return 0, err + } + lastRow := parser.LastRow() + rowCount++ + rowSize += int64(lastRow.Length) + parser.RecycleRow(lastRow) + if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount { + break + } + } + size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize)) + return size, nil +} diff --git a/br/pkg/lightning/mydump/loader_test.go b/br/pkg/lightning/mydump/loader_test.go index 30928ac06e1d1..69c3474d4cd1d 100644 --- a/br/pkg/lightning/mydump/loader_test.go +++ b/br/pkg/lightning/mydump/loader_test.go @@ -19,9 +19,11 @@ import ( "compress/gzip" "context" "fmt" + "math/rand" "os" "path/filepath" "testing" + "time" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -32,6 +34,8 @@ import ( router "github.com/pingcap/tidb/util/table-router" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/writer" ) type testMydumpLoaderSuite struct { @@ -1103,3 +1107,62 @@ func TestSampleFileCompressRatio(t *testing.T) { require.NoError(t, err) require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5) } + +func TestSampleParquetDataSize(t *testing.T) { + s := newTestMydumpLoaderSuite(t) + store, err := storage.NewLocalStorage(s.sourceDir) + require.NoError(t, err) + + type row struct { + ID int64 `parquet:"name=id, type=INT64"` + Key string `parquet:"name=key, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` + Value string `parquet:"name=value, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"` + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + byteArray := make([]byte, 0, 40*1024) + bf := bytes.NewBuffer(byteArray) + pwriter, err := writer.NewParquetWriterFromWriter(bf, new(row), 4) + require.NoError(t, err) + pwriter.RowGroupSize = 128 * 1024 * 1024 //128M + pwriter.PageSize = 8 * 1024 //8K + pwriter.CompressionType = parquet.CompressionCodec_SNAPPY + seed := time.Now().Unix() + t.Logf("seed: %d", seed) + rand.Seed(seed) + totalRowSize := 0 + for i := 0; i < 1000; i++ { + kl := rand.Intn(20) + 1 + key := make([]byte, kl) + kl, err = rand.Read(key) + require.NoError(t, err) + vl := rand.Intn(20) + 1 + value := make([]byte, vl) + vl, err = rand.Read(value) + require.NoError(t, err) + + totalRowSize += kl + vl + 8 + row := row{ + ID: int64(i), + Key: string(key[:kl]), + Value: string(value[:vl]), + } + err = pwriter.Write(row) + require.NoError(t, err) + } + err = pwriter.WriteStop() + require.NoError(t, err) + + fileName := "test_1.t1.parquet" + err = store.WriteFile(ctx, fileName, bf.Bytes()) + require.NoError(t, err) + + size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{ + Path: fileName, + }, store) + require.NoError(t, err) + // expected error within 10%, so delta = totalRowSize / 10 + require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10) +}