-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support sample for compressed files for adjustment #39680
Changes from 9 commits
d5a9b6b
68ba1ab
58efcee
a0402aa
70d7a5b
9499ada
ef0a69d
b80b066
0c46408
7dcd874
ecfc26c
f498c6e
e2db722
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package mydump | |
|
||
import ( | ||
"context" | ||
"io" | ||
"path/filepath" | ||
"sort" | ||
"strings" | ||
|
@@ -30,6 +31,9 @@ import ( | |
"go.uber.org/zap" | ||
) | ||
|
||
// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files | ||
const sampleCompressedFileSize = 4 * 1024 | ||
|
||
// MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader. | ||
type MDDatabaseMeta struct { | ||
Name string | ||
|
@@ -82,7 +86,9 @@ type SourceFileMeta struct { | |
Compression Compression | ||
SortKey string | ||
FileSize int64 | ||
ExtendData ExtendColumnData | ||
// WARNING: variables below are not persistent | ||
ExtendData ExtendColumnData | ||
RealSize int64 | ||
} | ||
|
||
// NewMDTableMeta creates an Mydumper table meta with specified character set. | ||
|
@@ -386,7 +392,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context) error { | |
// set a dummy `FileInfo` here without file meta because we needn't restore the table schema | ||
tableMeta, _, _ := s.insertTable(FileInfo{TableName: fileInfo.TableName}) | ||
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo) | ||
tableMeta.TotalSize += fileInfo.FileMeta.FileSize | ||
tableMeta.TotalSize += fileInfo.FileMeta.RealSize | ||
} | ||
|
||
for _, dbMeta := range s.loader.dbs { | ||
|
@@ -453,7 +459,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size | |
|
||
info := FileInfo{ | ||
TableName: filter.Table{Schema: res.Schema, Name: res.Name}, | ||
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size}, | ||
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size, RealSize: size}, | ||
} | ||
|
||
if s.loader.shouldSkip(&info.TableName) { | ||
|
@@ -470,6 +476,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size | |
case SourceTypeViewSchema: | ||
s.viewSchemas = append(s.viewSchemas, info) | ||
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet: | ||
if info.FileMeta.Compression != CompressionNone { | ||
compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore()) | ||
if err2 != nil { | ||
logger.Error("[loader] fail to calculate data file compress ratio", | ||
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type)) | ||
} else { | ||
info.FileMeta.RealSize = int64(compressRatio * float64(info.FileMeta.FileSize)) | ||
} | ||
} | ||
s.tableDatas = append(s.tableDatas, info) | ||
} | ||
|
||
|
@@ -648,3 +663,81 @@ func (l *MDLoader) GetDatabases() []*MDDatabaseMeta { | |
func (l *MDLoader) GetStore() storage.ExternalStorage { | ||
return l.store | ||
} | ||
|
||
func calculateFileBytes(ctx context.Context, | ||
dataFile string, | ||
compressType storage.CompressType, | ||
store storage.ExternalStorage, | ||
offset int64) (tot int, pos int64, err error) { | ||
bytes := make([]byte, sampleCompressedFileSize) | ||
reader, err := store.Open(ctx, dataFile) | ||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
defer reader.Close() | ||
|
||
compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset) | ||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
|
||
readBytes := func() error { | ||
n, err2 := compressReader.Read(bytes) | ||
if err2 != nil && errors.Cause(err2) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { | ||
return err2 | ||
} | ||
tot += n | ||
return err2 | ||
} | ||
|
||
if offset == 0 { | ||
err = readBytes() | ||
if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { | ||
return 0, 0, err | ||
} | ||
pos, err = compressReader.Seek(0, io.SeekCurrent) | ||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
return tot, pos, nil | ||
} | ||
|
||
for { | ||
err = readBytes() | ||
if err != nil { | ||
break | ||
} | ||
} | ||
if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
return tot, offset, nil | ||
} | ||
|
||
// SampleFileCompressRatio samples the compress ratio of the compressed file. | ||
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { | ||
if fileMeta.Compression == CompressionNone { | ||
return 1, nil | ||
} | ||
compressType, err := ToStorageCompressType(fileMeta.Compression) | ||
if err != nil { | ||
return 0, err | ||
} | ||
// We use the following method to sample the compress ratio of the first few bytes of the file. | ||
// 1. read first time aiming to find a valid compressed file offset. If we continue read now, the compress reader will | ||
// request more data from file reader buffer them in its memory. We can't compute an accurate compress ratio. | ||
// 2. we use a second reading and limit the file reader only read n bytes(n is the valid position we find in the first reading). | ||
// Then we read all the data out from the compress reader. The data length m we read out is the uncompressed data length. | ||
// Use m/n to compute the compress ratio. | ||
// read first time, aims to find a valid end pos in compressed file | ||
_, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, 0) | ||
if err != nil { | ||
return 0, err | ||
} | ||
// read second time, original reader ends at first time's valid pos, compute sample data compress ratio | ||
tot, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, pos) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really understand why we need to read twice here? Would you elaborate it in the PR or in the comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the above comment saying: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay, I finally understand what you mean by reading the compression reader code😢
Am I right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. That's what I mean. |
||
if err != nil { | ||
return 0, err | ||
} | ||
return float64(tot) / float64(pos), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,16 +31,14 @@ import ( | |
) | ||
|
||
const ( | ||
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 | ||
compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold | ||
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 | ||
// the increment ratio of large CSV file size threshold by `region-split-size` | ||
largeCSVLowerThresholdRation = 10 | ||
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency | ||
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. | ||
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold | ||
// compressDataRatio is a relatively maximum compress ratio for normal compressed data | ||
// It's used to estimate rowIDMax, we use a large value to try to avoid overlapping | ||
compressDataRatio = 500 | ||
// CompressSizeFactor is used to adjust compressed data size | ||
CompressSizeFactor = 5 | ||
) | ||
|
||
// TableRegion contains information for a table region during import. | ||
|
@@ -303,11 +301,11 @@ func MakeSourceFileRegion( | |
rowIDMax := fileSize / divisor | ||
// for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax. | ||
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files. | ||
// TODO: update progress bar calculation for compressed files. | ||
if fi.FileMeta.Compression != CompressionNone { | ||
// FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax | ||
// currently we use 500 here. It's a relatively large value for most data. | ||
rowIDMax = fileSize * compressDataRatio / divisor | ||
// RealSize the estimated file size. There are some cases that the first few bytes of this compressed file | ||
// has smaller compress ratio than the whole compressed file. So we still need to multiply this factor to | ||
// make sure the rowIDMax computation is correct. | ||
rowIDMax = fi.FileMeta.RealSize * CompressSizeFactor / divisor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The real size is the size of the de-compressed file, so we don't need to multiply the CompressSizeFactor anymore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. The real size is the estimated file size. There are some cases that the first few bytes of this compressed file has lower compress ratio than the whole compressed file. So we still need to muliply this factor to make sure the limitation is loose. I will add this in comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lgtm |
||
fileSize = TableFileSizeINF | ||
} | ||
tableRegion := &TableRegion{ | ||
|
@@ -317,24 +315,23 @@ func MakeSourceFileRegion( | |
Chunk: Chunk{ | ||
Offset: 0, | ||
EndOffset: fileSize, | ||
RealOffset: 0, | ||
PrevRowIDMax: 0, | ||
RowIDMax: rowIDMax, | ||
}, | ||
} | ||
|
||
regionTooBig := false | ||
if fi.FileMeta.Compression == CompressionNone { | ||
regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold | ||
} else { | ||
regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold | ||
regionSize := tableRegion.Size() | ||
if fi.FileMeta.Compression != CompressionNone { | ||
regionSize = fi.FileMeta.RealSize | ||
} | ||
if regionTooBig { | ||
if regionSize > tableRegionSizeWarningThreshold { | ||
log.FromContext(ctx).Warn( | ||
"file is too big to be processed efficiently; we suggest splitting it at 256 MB each", | ||
zap.String("file", fi.FileMeta.Path), | ||
zap.Int64("size", dataFileSize)) | ||
zap.Int64("size", regionSize)) | ||
} | ||
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil | ||
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.RealSize)}, nil | ||
} | ||
|
||
// because parquet files can't seek efficiently, there is no benefit in split. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
n
equal tosampleCompressedFileSize
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand your problem.
n
is the bytes read in one process.