diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 53cc38b52f..a931fb5d13 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -524,7 +524,7 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { for iter.ReadArray() { for field := iter.ReadObject(); field != ""; field = iter.ReadObject() { switch field { - case "type", "name", "linkName", "digest", "chunkDigest": + case "type", "name", "linkName", "digest", "chunkDigest", "chunkType": count += len(iter.ReadStringAsSlice()) case "xattrs": for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { @@ -609,6 +609,8 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { m.ChunkOffset = iter.ReadInt64() case "chunkDigest": m.ChunkDigest = getString(iter.ReadStringAsSlice()) + case "chunkType": + m.ChunkType = getString(iter.ReadStringAsSlice()) case "xattrs": m.Xattrs = make(map[string]string) for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { diff --git a/pkg/chunked/chunked b/pkg/chunked/chunked deleted file mode 120000 index ce52bb9935..0000000000 --- a/pkg/chunked/chunked +++ /dev/null @@ -1 +0,0 @@ -/home/gscrivano/src/gopath/src/github.com/containers/storage/pkg/chunked \ No newline at end of file diff --git a/pkg/chunked/compressor/compressor.go b/pkg/chunked/compressor/compressor.go index a2035a8b69..7893cf9aab 100644 --- a/pkg/chunked/compressor/compressor.go +++ b/pkg/chunked/compressor/compressor.go @@ -17,21 +17,152 @@ import ( ) const RollsumBits = 16 +const holesThreshold = int64(1 << 10) + +type holesFinder struct { + reader *bufio.Reader + fileOff int64 + zeros int64 + from int64 + threshold int64 + + state int +} + +const ( + holesFinderStateRead = iota + holesFinderStateAccumulate + holesFinderStateFound + holesFinderStateEOF +) + +// ReadByte reads a single byte from the underlying reader. +// If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil). +// If there are at least f.THRESHOLD consecutive zeros, then the +// return value is (N_CONSECUTIVE_ZEROS, '\x00'). +func (f *holesFinder) ReadByte() (int64, byte, error) { + for { + switch f.state { + // reading the file stream + case holesFinderStateRead: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + b, err := f.reader.ReadByte() + if err != nil { + return 0, b, err + } + + if b != 0 { + return 0, b, err + } + + f.zeros = 1 + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } else { + f.state = holesFinderStateAccumulate + } + // accumulating zeros, but still didn't reach the threshold + case holesFinderStateAccumulate: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + continue + } + return 0, b, err + } + + if b == 0 { + f.zeros++ + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } + } else { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + } + // found a hole. Number of zeros >= threshold + case holesFinderStateFound: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + } + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + if b != 0 { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + f.zeros++ + // reached EOF. Flush pending zeros if any. + case holesFinderStateEOF: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + return 0, 0, io.EOF + } + } +} type rollingChecksumReader struct { - reader *bufio.Reader - closed bool - rollsum *RollSum + reader *holesFinder + closed bool + rollsum *RollSum + pendingHole int64 + // WrittenOut is the total number of bytes read from + // the stream. WrittenOut int64 + + // IsLastChunkZeros tells whether the last generated + // chunk is a hole (made of consecutive zeros). If it + // is false, then the last chunk is a data chunk + // generated by the rolling checksum. + IsLastChunkZeros bool } func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { + rc.IsLastChunkZeros = false + + if rc.pendingHole > 0 { + toCopy := int64(len(b)) + if rc.pendingHole < toCopy { + toCopy = rc.pendingHole + } + rc.pendingHole -= toCopy + for i := int64(0); i < toCopy; i++ { + b[i] = 0 + } + + rc.WrittenOut += toCopy + + rc.IsLastChunkZeros = true + + // if there are no other zeros left, terminate the chunk + return rc.pendingHole == 0, int(toCopy), nil + } + if rc.closed { return false, 0, io.EOF } + for i := 0; i < len(b); i++ { - n, err := rc.reader.ReadByte() + holeLen, n, err := rc.reader.ReadByte() if err != nil { if err == io.EOF { rc.closed = true @@ -43,6 +174,13 @@ func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { // Report any other error type return false, -1, err } + if holeLen > 0 { + for j := int64(0); j < holeLen; j++ { + rc.rollsum.Roll(0) + } + rc.pendingHole = holeLen + return true, i, nil + } b[i] = n rc.WrittenOut++ rc.rollsum.Roll(n) @@ -58,6 +196,7 @@ type chunk struct { Offset int64 Checksum string ChunkSize int64 + ChunkType string } func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error { @@ -119,8 +258,13 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r chunks := []chunk{} + hf := &holesFinder{ + threshold: holesThreshold, + reader: bufio.NewReader(tr), + } + rcReader := &rollingChecksumReader{ - reader: bufio.NewReader(tr), + reader: hf, rollsum: NewRollSum(), } @@ -150,12 +294,21 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r return err } - chunks = append(chunks, chunk{ - ChunkOffset: lastChunkOffset, - Offset: lastOffset, - Checksum: chunkDigester.Digest().String(), - ChunkSize: rcReader.WrittenOut - lastChunkOffset, - }) + chunkSize := rcReader.WrittenOut - lastChunkOffset + if chunkSize > 0 { + chunkType := internal.ChunkTypeData + if rcReader.IsLastChunkZeros { + chunkType = internal.ChunkTypeZeros + } + + chunks = append(chunks, chunk{ + ChunkOffset: lastChunkOffset, + Offset: lastOffset, + Checksum: chunkDigester.Digest().String(), + ChunkSize: chunkSize, + ChunkType: chunkType, + }) + } lastOffset = off lastChunkOffset = rcReader.WrittenOut @@ -210,6 +363,7 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r entries[i].ChunkSize = chunks[i].ChunkSize entries[i].Offset = chunks[i].Offset entries[i].ChunkDigest = chunks[i].Checksum + entries[i].ChunkType = chunks[i].ChunkType } } metadata = append(metadata, entries...) diff --git a/pkg/chunked/compressor/compressor_test.go b/pkg/chunked/compressor/compressor_test.go new file mode 100644 index 0000000000..f8f69fca9c --- /dev/null +++ b/pkg/chunked/compressor/compressor_test.go @@ -0,0 +1,90 @@ +package compressor + +import ( + "bufio" + "bytes" + "io" + "testing" +) + +func TestHole(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 1, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Errorf("EOF not found") + } + + hf = &holesFinder{ + threshold: 1000, + reader: bufio.NewReader(bytes.NewReader(data)), + } + for i := 0; i < 5; i++ { + hole, byte, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if byte != 0 { + t.Error("wrong read") + } + } + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} + +func TestTwoHoles(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00FOO\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 2, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("hole not found") + } + + for _, e := range []byte("FOO") { + hole, c, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if c != e { + t.Errorf("wrong byte read %v instead of %v", c, e) + } + } + hole, _, err = hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} diff --git a/pkg/chunked/internal/compression.go b/pkg/chunked/internal/compression.go index 01007dd966..1fce3e6f93 100644 --- a/pkg/chunked/internal/compression.go +++ b/pkg/chunked/internal/compression.go @@ -46,11 +46,17 @@ type FileMetadata struct { ChunkSize int64 `json:"chunkSize,omitempty"` ChunkOffset int64 `json:"chunkOffset,omitempty"` ChunkDigest string `json:"chunkDigest,omitempty"` + ChunkType string `json:"chunkType,omitempty"` // internal: computed by mergeTOCEntries. Chunks []*FileMetadata `json:"-"` } +const ( + ChunkTypeData = "" + ChunkTypeZeros = "zeros" +) + const ( TypeReg = "reg" TypeChunk = "chunk" diff --git a/pkg/chunked/storage_linux.go b/pkg/chunked/storage_linux.go index 2ce77ad2f8..43a97ad46f 100644 --- a/pkg/chunked/storage_linux.go +++ b/pkg/chunked/storage_linux.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "fmt" + "hash" "io" "io/ioutil" "os" @@ -42,9 +43,10 @@ const ( containersOverrideXattr = "user.containers.override_stat" bigDataKey = "zstd-chunked-manifest" - fileTypeZstdChunked = iota - fileTypeEstargz = iota - fileTypeNoCompression = iota + fileTypeZstdChunked = iota + fileTypeEstargz + fileTypeNoCompression + fileTypeHole copyGoRoutines = 32 ) @@ -438,14 +440,14 @@ func maybeDoIDRemap(manifest []internal.FileMetadata, options *archive.TarOption } type originFile struct { - Root string - Path string - + Root string + Path string Offset int64 } type missingFileChunk struct { - Gap int64 + Gap int64 + Hole bool File *internal.FileMetadata @@ -454,6 +456,7 @@ type missingFileChunk struct { } type missingPart struct { + Hole bool SourceChunk *ImageSourceChunk OriginFile *originFile Chunks []missingFileChunk @@ -722,9 +725,19 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil return nil, err } -func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error { - switch compression { - case fileTypeZstdChunked: +func (c *chunkedDiffer) prepareCompressedStreamToFile(partCompression compressedFileType, from io.Reader, mf *missingFileChunk) (compressedFileType, error) { + switch { + case partCompression == fileTypeHole: + // The entire part is a hole. Do not need to read from a file. + c.rawReader = nil + return fileTypeHole, nil + case mf.Hole: + // Only the missing chunk in the requested part refers to a hole. + // The received data must be discarded. + limitReader := io.LimitReader(from, mf.CompressedSize) + _, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer) + return fileTypeHole, err + case partCompression == fileTypeZstdChunked: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.zstdReader == nil { r := zstd.NewReader(c.rawReader) @@ -732,42 +745,83 @@ func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFile } else { c.zstdReader.Reset(c.rawReader, nil) } - case fileTypeEstargz: + case partCompression == fileTypeEstargz: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.gzipReader == nil { r, err := pgzip.NewReader(c.rawReader) if err != nil { - return err + return partCompression, err } c.gzipReader = r } else { if err := c.gzipReader.Reset(c.rawReader); err != nil { - return err + return partCompression, err } } - - case fileTypeNoCompression: + case partCompression == fileTypeNoCompression: c.rawReader = io.LimitReader(from, mf.UncompressedSize) default: - return fmt.Errorf("unknown file type %q", c.fileType) + return partCompression, fmt.Errorf("unknown file type %q", c.fileType) + } + return partCompression, nil +} + +// hashHole writes SIZE zeros to the specified hasher +func hashHole(h hash.Hash, size int64, copyBuffer []byte) error { + count := int64(len(copyBuffer)) + if size < count { + count = size + } + for i := int64(0); i < count; i++ { + copyBuffer[i] = 0 + } + for size > 0 { + count = int64(len(copyBuffer)) + if size < count { + count = size + } + if _, err := h.Write(copyBuffer[:count]); err != nil { + return err + } + size -= count + } + return nil +} + +// appendHole creates a hole with the specified size at the open fd. +func appendHole(fd int, size int64) error { + off, err := unix.Seek(fd, size, unix.SEEK_CUR) + if err != nil { + return err + } + // Make sure the file size is changed. It might be the last hole and no other data written afterwards. + if err := unix.Ftruncate(fd, off); err != nil { + return err } return nil } -func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error { +func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, size int64) error { switch compression { case fileTypeZstdChunked: defer c.zstdReader.Reset(nil, nil) - if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { return err } case fileTypeEstargz: defer c.gzipReader.Close() - if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { return err } case fileTypeNoCompression: - if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + return err + } + case fileTypeHole: + if err := appendHole(int(destFile.file.Fd()), size); err != nil { + return err + } + if err := hashHole(destFile.hash, size, c.copyBuffer); err != nil { return err } default: @@ -780,6 +834,7 @@ type destinationFile struct { dirfd int file *os.File digester digest.Digester + hash hash.Hash to io.Writer metadata *internal.FileMetadata options *archive.TarOptions @@ -792,11 +847,13 @@ func openDestinationFile(dirfd int, metadata *internal.FileMetadata, options *ar } digester := digest.Canonical.Digester() - to := io.MultiWriter(file, digester.Hash()) + hash := digester.Hash() + to := io.MultiWriter(file, hash) return &destinationFile{ file: file, digester: digester, + hash: hash, to: to, metadata: metadata, options: options, @@ -841,15 +898,17 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan for _, missingPart := range missingParts { var part io.ReadCloser - compression := c.fileType + partCompression := c.fileType switch { + case missingPart.Hole: + partCompression = fileTypeHole case missingPart.OriginFile != nil: var err error part, err = missingPart.OriginFile.OpenFile() if err != nil { return err } - compression = fileTypeNoCompression + partCompression = fileTypeNoCompression case missingPart.SourceChunk != nil: select { case p := <-streams: @@ -880,7 +939,8 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan goto exit } - if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil { + compression, err := c.prepareCompressedStreamToFile(partCompression, part, &mf) + if err != nil { Err = err goto exit } @@ -911,19 +971,23 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan } } - if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil { + if err := c.appendCompressedStreamToFile(compression, destFile, mf.UncompressedSize); err != nil { Err = err goto exit } - if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { - Err = err - goto exit + if c.rawReader != nil { + if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { + Err = err + goto exit + } } } exit: - part.Close() - if Err != nil { - break + if part != nil { + part.Close() + if Err != nil { + break + } } } @@ -957,6 +1021,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) if gap == 0 && missingParts[prevIndex].OriginFile == nil && missingParts[i].OriginFile == nil && + !missingParts[prevIndex].Hole && !missingParts[i].Hole && len(missingParts[prevIndex].Chunks) == 1 && len(missingParts[i].Chunks) == 1 && missingParts[prevIndex].Chunks[0].File.Name == missingParts[i].Chunks[0].File.Name { missingParts[prevIndex].SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length @@ -983,6 +1048,9 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { sort.Ints(costs) toShrink := len(missingParts) - target + if toShrink >= len(costs) { + toShrink = len(costs) - 1 + } targetValue := costs[toShrink] newMissingParts = missingParts[0:1] @@ -993,6 +1061,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) prev := &newMissingParts[len(newMissingParts)-1] prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length + prev.Hole = false prev.OriginFile = nil if gap > 0 { gapFile := missingFileChunk{ @@ -1009,7 +1078,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error { var chunksToRequest []ImageSourceChunk for _, c := range missingParts { - if c.OriginFile == nil { + if c.OriginFile == nil && !c.Hole { chunksToRequest = append(chunksToRequest, *c.SourceChunk) } } @@ -1542,16 +1611,26 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra }, } - root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) - if err != nil { - return output, err - } - if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + switch chunk.ChunkType { + case internal.ChunkTypeData: + root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) + if err != nil { + return output, err + } + if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + missingPartsSize -= size + mp.OriginFile = &originFile{ + Root: root, + Path: path, + Offset: offset, + } + } + case internal.ChunkTypeZeros: missingPartsSize -= size - mp.OriginFile = &originFile{ - Root: root, - Path: path, - Offset: offset, + mp.Hole = true + // Mark all chunks belonging to the missing part as holes + for i := range mp.Chunks { + mp.Chunks[i].Hole = true } } missingParts = append(missingParts, mp)