From 198820877c3476af5bf6654af1222b5e87267512 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 11 Jan 2022 18:32:43 +0100 Subject: [PATCH] pkg/chunked: add support for sparse files automatically detect holes in sparse files (the threshold is hardcoded at 1kb for now) and add this information to the manifest file. The receiver will create a hole (using unix.Seek and unix.Ftruncate) instead of writing the actual zeros. Closes: https://github.com/containers/storage/issues/1091 Signed-off-by: Giuseppe Scrivano --- pkg/chunked/cache_linux.go | 4 +- pkg/chunked/compressor/compressor.go | 176 ++++++++++++++++++++-- pkg/chunked/compressor/compressor_test.go | 90 +++++++++++ pkg/chunked/internal/compression.go | 6 + pkg/chunked/storage_linux.go | 161 +++++++++++++++----- 5 files changed, 384 insertions(+), 53 deletions(-) create mode 100644 pkg/chunked/compressor/compressor_test.go diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 53cc38b52f..a931fb5d13 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -524,7 +524,7 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { for iter.ReadArray() { for field := iter.ReadObject(); field != ""; field = iter.ReadObject() { switch field { - case "type", "name", "linkName", "digest", "chunkDigest": + case "type", "name", "linkName", "digest", "chunkDigest", "chunkType": count += len(iter.ReadStringAsSlice()) case "xattrs": for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { @@ -609,6 +609,8 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { m.ChunkOffset = iter.ReadInt64() case "chunkDigest": m.ChunkDigest = getString(iter.ReadStringAsSlice()) + case "chunkType": + m.ChunkType = getString(iter.ReadStringAsSlice()) case "xattrs": m.Xattrs = make(map[string]string) for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { diff --git a/pkg/chunked/compressor/compressor.go b/pkg/chunked/compressor/compressor.go index a2035a8b69..7893cf9aab 100644 --- a/pkg/chunked/compressor/compressor.go +++ b/pkg/chunked/compressor/compressor.go @@ -17,21 +17,152 @@ import ( ) const RollsumBits = 16 +const holesThreshold = int64(1 << 10) + +type holesFinder struct { + reader *bufio.Reader + fileOff int64 + zeros int64 + from int64 + threshold int64 + + state int +} + +const ( + holesFinderStateRead = iota + holesFinderStateAccumulate + holesFinderStateFound + holesFinderStateEOF +) + +// ReadByte reads a single byte from the underlying reader. +// If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil). +// If there are at least f.THRESHOLD consecutive zeros, then the +// return value is (N_CONSECUTIVE_ZEROS, '\x00'). +func (f *holesFinder) ReadByte() (int64, byte, error) { + for { + switch f.state { + // reading the file stream + case holesFinderStateRead: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + b, err := f.reader.ReadByte() + if err != nil { + return 0, b, err + } + + if b != 0 { + return 0, b, err + } + + f.zeros = 1 + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } else { + f.state = holesFinderStateAccumulate + } + // accumulating zeros, but still didn't reach the threshold + case holesFinderStateAccumulate: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + continue + } + return 0, b, err + } + + if b == 0 { + f.zeros++ + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } + } else { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + } + // found a hole. Number of zeros >= threshold + case holesFinderStateFound: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + } + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + if b != 0 { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + f.zeros++ + // reached EOF. Flush pending zeros if any. + case holesFinderStateEOF: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + return 0, 0, io.EOF + } + } +} type rollingChecksumReader struct { - reader *bufio.Reader - closed bool - rollsum *RollSum + reader *holesFinder + closed bool + rollsum *RollSum + pendingHole int64 + // WrittenOut is the total number of bytes read from + // the stream. WrittenOut int64 + + // IsLastChunkZeros tells whether the last generated + // chunk is a hole (made of consecutive zeros). If it + // is false, then the last chunk is a data chunk + // generated by the rolling checksum. + IsLastChunkZeros bool } func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { + rc.IsLastChunkZeros = false + + if rc.pendingHole > 0 { + toCopy := int64(len(b)) + if rc.pendingHole < toCopy { + toCopy = rc.pendingHole + } + rc.pendingHole -= toCopy + for i := int64(0); i < toCopy; i++ { + b[i] = 0 + } + + rc.WrittenOut += toCopy + + rc.IsLastChunkZeros = true + + // if there are no other zeros left, terminate the chunk + return rc.pendingHole == 0, int(toCopy), nil + } + if rc.closed { return false, 0, io.EOF } + for i := 0; i < len(b); i++ { - n, err := rc.reader.ReadByte() + holeLen, n, err := rc.reader.ReadByte() if err != nil { if err == io.EOF { rc.closed = true @@ -43,6 +174,13 @@ func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { // Report any other error type return false, -1, err } + if holeLen > 0 { + for j := int64(0); j < holeLen; j++ { + rc.rollsum.Roll(0) + } + rc.pendingHole = holeLen + return true, i, nil + } b[i] = n rc.WrittenOut++ rc.rollsum.Roll(n) @@ -58,6 +196,7 @@ type chunk struct { Offset int64 Checksum string ChunkSize int64 + ChunkType string } func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error { @@ -119,8 +258,13 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r chunks := []chunk{} + hf := &holesFinder{ + threshold: holesThreshold, + reader: bufio.NewReader(tr), + } + rcReader := &rollingChecksumReader{ - reader: bufio.NewReader(tr), + reader: hf, rollsum: NewRollSum(), } @@ -150,12 +294,21 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r return err } - chunks = append(chunks, chunk{ - ChunkOffset: lastChunkOffset, - Offset: lastOffset, - Checksum: chunkDigester.Digest().String(), - ChunkSize: rcReader.WrittenOut - lastChunkOffset, - }) + chunkSize := rcReader.WrittenOut - lastChunkOffset + if chunkSize > 0 { + chunkType := internal.ChunkTypeData + if rcReader.IsLastChunkZeros { + chunkType = internal.ChunkTypeZeros + } + + chunks = append(chunks, chunk{ + ChunkOffset: lastChunkOffset, + Offset: lastOffset, + Checksum: chunkDigester.Digest().String(), + ChunkSize: chunkSize, + ChunkType: chunkType, + }) + } lastOffset = off lastChunkOffset = rcReader.WrittenOut @@ -210,6 +363,7 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r entries[i].ChunkSize = chunks[i].ChunkSize entries[i].Offset = chunks[i].Offset entries[i].ChunkDigest = chunks[i].Checksum + entries[i].ChunkType = chunks[i].ChunkType } } metadata = append(metadata, entries...) diff --git a/pkg/chunked/compressor/compressor_test.go b/pkg/chunked/compressor/compressor_test.go new file mode 100644 index 0000000000..f8f69fca9c --- /dev/null +++ b/pkg/chunked/compressor/compressor_test.go @@ -0,0 +1,90 @@ +package compressor + +import ( + "bufio" + "bytes" + "io" + "testing" +) + +func TestHole(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 1, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Errorf("EOF not found") + } + + hf = &holesFinder{ + threshold: 1000, + reader: bufio.NewReader(bytes.NewReader(data)), + } + for i := 0; i < 5; i++ { + hole, byte, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if byte != 0 { + t.Error("wrong read") + } + } + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} + +func TestTwoHoles(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00FOO\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 2, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("hole not found") + } + + for _, e := range []byte("FOO") { + hole, c, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if c != e { + t.Errorf("wrong byte read %v instead of %v", c, e) + } + } + hole, _, err = hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} diff --git a/pkg/chunked/internal/compression.go b/pkg/chunked/internal/compression.go index 01007dd966..1fce3e6f93 100644 --- a/pkg/chunked/internal/compression.go +++ b/pkg/chunked/internal/compression.go @@ -46,11 +46,17 @@ type FileMetadata struct { ChunkSize int64 `json:"chunkSize,omitempty"` ChunkOffset int64 `json:"chunkOffset,omitempty"` ChunkDigest string `json:"chunkDigest,omitempty"` + ChunkType string `json:"chunkType,omitempty"` // internal: computed by mergeTOCEntries. Chunks []*FileMetadata `json:"-"` } +const ( + ChunkTypeData = "" + ChunkTypeZeros = "zeros" +) + const ( TypeReg = "reg" TypeChunk = "chunk" diff --git a/pkg/chunked/storage_linux.go b/pkg/chunked/storage_linux.go index 2ce77ad2f8..43a97ad46f 100644 --- a/pkg/chunked/storage_linux.go +++ b/pkg/chunked/storage_linux.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "fmt" + "hash" "io" "io/ioutil" "os" @@ -42,9 +43,10 @@ const ( containersOverrideXattr = "user.containers.override_stat" bigDataKey = "zstd-chunked-manifest" - fileTypeZstdChunked = iota - fileTypeEstargz = iota - fileTypeNoCompression = iota + fileTypeZstdChunked = iota + fileTypeEstargz + fileTypeNoCompression + fileTypeHole copyGoRoutines = 32 ) @@ -438,14 +440,14 @@ func maybeDoIDRemap(manifest []internal.FileMetadata, options *archive.TarOption } type originFile struct { - Root string - Path string - + Root string + Path string Offset int64 } type missingFileChunk struct { - Gap int64 + Gap int64 + Hole bool File *internal.FileMetadata @@ -454,6 +456,7 @@ type missingFileChunk struct { } type missingPart struct { + Hole bool SourceChunk *ImageSourceChunk OriginFile *originFile Chunks []missingFileChunk @@ -722,9 +725,19 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil return nil, err } -func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error { - switch compression { - case fileTypeZstdChunked: +func (c *chunkedDiffer) prepareCompressedStreamToFile(partCompression compressedFileType, from io.Reader, mf *missingFileChunk) (compressedFileType, error) { + switch { + case partCompression == fileTypeHole: + // The entire part is a hole. Do not need to read from a file. + c.rawReader = nil + return fileTypeHole, nil + case mf.Hole: + // Only the missing chunk in the requested part refers to a hole. + // The received data must be discarded. + limitReader := io.LimitReader(from, mf.CompressedSize) + _, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer) + return fileTypeHole, err + case partCompression == fileTypeZstdChunked: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.zstdReader == nil { r := zstd.NewReader(c.rawReader) @@ -732,42 +745,83 @@ func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFile } else { c.zstdReader.Reset(c.rawReader, nil) } - case fileTypeEstargz: + case partCompression == fileTypeEstargz: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.gzipReader == nil { r, err := pgzip.NewReader(c.rawReader) if err != nil { - return err + return partCompression, err } c.gzipReader = r } else { if err := c.gzipReader.Reset(c.rawReader); err != nil { - return err + return partCompression, err } } - - case fileTypeNoCompression: + case partCompression == fileTypeNoCompression: c.rawReader = io.LimitReader(from, mf.UncompressedSize) default: - return fmt.Errorf("unknown file type %q", c.fileType) + return partCompression, fmt.Errorf("unknown file type %q", c.fileType) + } + return partCompression, nil +} + +// hashHole writes SIZE zeros to the specified hasher +func hashHole(h hash.Hash, size int64, copyBuffer []byte) error { + count := int64(len(copyBuffer)) + if size < count { + count = size + } + for i := int64(0); i < count; i++ { + copyBuffer[i] = 0 + } + for size > 0 { + count = int64(len(copyBuffer)) + if size < count { + count = size + } + if _, err := h.Write(copyBuffer[:count]); err != nil { + return err + } + size -= count + } + return nil +} + +// appendHole creates a hole with the specified size at the open fd. +func appendHole(fd int, size int64) error { + off, err := unix.Seek(fd, size, unix.SEEK_CUR) + if err != nil { + return err + } + // Make sure the file size is changed. It might be the last hole and no other data written afterwards. + if err := unix.Ftruncate(fd, off); err != nil { + return err } return nil } -func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error { +func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, size int64) error { switch compression { case fileTypeZstdChunked: defer c.zstdReader.Reset(nil, nil) - if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { return err } case fileTypeEstargz: defer c.gzipReader.Close() - if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { return err } case fileTypeNoCompression: - if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + return err + } + case fileTypeHole: + if err := appendHole(int(destFile.file.Fd()), size); err != nil { + return err + } + if err := hashHole(destFile.hash, size, c.copyBuffer); err != nil { return err } default: @@ -780,6 +834,7 @@ type destinationFile struct { dirfd int file *os.File digester digest.Digester + hash hash.Hash to io.Writer metadata *internal.FileMetadata options *archive.TarOptions @@ -792,11 +847,13 @@ func openDestinationFile(dirfd int, metadata *internal.FileMetadata, options *ar } digester := digest.Canonical.Digester() - to := io.MultiWriter(file, digester.Hash()) + hash := digester.Hash() + to := io.MultiWriter(file, hash) return &destinationFile{ file: file, digester: digester, + hash: hash, to: to, metadata: metadata, options: options, @@ -841,15 +898,17 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan for _, missingPart := range missingParts { var part io.ReadCloser - compression := c.fileType + partCompression := c.fileType switch { + case missingPart.Hole: + partCompression = fileTypeHole case missingPart.OriginFile != nil: var err error part, err = missingPart.OriginFile.OpenFile() if err != nil { return err } - compression = fileTypeNoCompression + partCompression = fileTypeNoCompression case missingPart.SourceChunk != nil: select { case p := <-streams: @@ -880,7 +939,8 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan goto exit } - if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil { + compression, err := c.prepareCompressedStreamToFile(partCompression, part, &mf) + if err != nil { Err = err goto exit } @@ -911,19 +971,23 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan } } - if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil { + if err := c.appendCompressedStreamToFile(compression, destFile, mf.UncompressedSize); err != nil { Err = err goto exit } - if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { - Err = err - goto exit + if c.rawReader != nil { + if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { + Err = err + goto exit + } } } exit: - part.Close() - if Err != nil { - break + if part != nil { + part.Close() + if Err != nil { + break + } } } @@ -957,6 +1021,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) if gap == 0 && missingParts[prevIndex].OriginFile == nil && missingParts[i].OriginFile == nil && + !missingParts[prevIndex].Hole && !missingParts[i].Hole && len(missingParts[prevIndex].Chunks) == 1 && len(missingParts[i].Chunks) == 1 && missingParts[prevIndex].Chunks[0].File.Name == missingParts[i].Chunks[0].File.Name { missingParts[prevIndex].SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length @@ -983,6 +1048,9 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { sort.Ints(costs) toShrink := len(missingParts) - target + if toShrink >= len(costs) { + toShrink = len(costs) - 1 + } targetValue := costs[toShrink] newMissingParts = missingParts[0:1] @@ -993,6 +1061,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) prev := &newMissingParts[len(newMissingParts)-1] prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length + prev.Hole = false prev.OriginFile = nil if gap > 0 { gapFile := missingFileChunk{ @@ -1009,7 +1078,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error { var chunksToRequest []ImageSourceChunk for _, c := range missingParts { - if c.OriginFile == nil { + if c.OriginFile == nil && !c.Hole { chunksToRequest = append(chunksToRequest, *c.SourceChunk) } } @@ -1542,16 +1611,26 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra }, } - root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) - if err != nil { - return output, err - } - if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + switch chunk.ChunkType { + case internal.ChunkTypeData: + root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) + if err != nil { + return output, err + } + if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + missingPartsSize -= size + mp.OriginFile = &originFile{ + Root: root, + Path: path, + Offset: offset, + } + } + case internal.ChunkTypeZeros: missingPartsSize -= size - mp.OriginFile = &originFile{ - Root: root, - Path: path, - Offset: offset, + mp.Hole = true + // Mark all chunks belonging to the missing part as holes + for i := range mp.Chunks { + mp.Chunks[i].Hole = true } } missingParts = append(missingParts, mp)