diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 69359b5529..a931fb5d13 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -107,7 +107,16 @@ func (c *layersCache) load() error { continue } - metadata, err := c.readMetadataFromCache(r.ID) + bigData, err := c.store.LayerBigData(r.ID, cacheKey) + if err != nil { + if errors.Cause(err) == os.ErrNotExist { + continue + } + return err + } + defer bigData.Close() + + metadata, err := readMetadataFromCache(bigData) if err != nil { logrus.Warningf("Error reading cache file for layer %q: %v", r.ID, err) } @@ -117,7 +126,17 @@ func (c *layersCache) load() error { continue } - metadata, err = c.writeCache(r.ID) + manifestReader, err := c.store.LayerBigData(r.ID, bigDataKey) + if err != nil { + continue + } + defer manifestReader.Close() + manifest, err := ioutil.ReadAll(manifestReader) + if err != nil { + return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err) + } + + metadata, err = writeCache(manifest, r.ID, c.store) if err == nil { c.addLayer(r.ID, metadata) } @@ -182,6 +201,11 @@ func generateTag(digest string, offset, len uint64) string { return fmt.Sprintf("%s%.20d@%.20d", digest, offset, len) } +type setBigData interface { + // SetLayerBigData stores a (possibly large) chunk of named data + SetLayerBigData(id, key string, data io.Reader) error +} + // writeCache write a cache for the layer ID. // It generates a sorted list of digests with their offset to the path location and offset. // The same cache is used to lookup files, chunks and candidates for deduplication with hard links. @@ -189,13 +213,13 @@ func generateTag(digest string, offset, len uint64) string { // - digest(file.payload)) // - digest(digest(file.payload) + file.UID + file.GID + file.mode + file.xattrs) // - digest(i) for each i in chunks(file payload) -func (c *layersCache) writeCache(id string) (*metadata, error) { +func writeCache(manifest []byte, id string, dest setBigData) (*metadata, error) { var vdata bytes.Buffer tagLen := 0 digestLen := 0 var tagsBuffer bytes.Buffer - toc, err := c.prepareMetadata(id) + toc, err := prepareMetadata(manifest) if err != nil { return nil, err } @@ -317,7 +341,7 @@ func (c *layersCache) writeCache(id string) (*metadata, error) { r := io.TeeReader(pipeReader, counter) - if err := c.store.SetLayerBigData(id, cacheKey, r); err != nil { + if err := dest.SetLayerBigData(id, cacheKey, r); err != nil { return nil, err } @@ -328,22 +352,14 @@ func (c *layersCache) writeCache(id string) (*metadata, error) { logrus.Debugf("Written lookaside cache for layer %q with length %v", id, counter.Count) return &metadata{ - tagLen: tagLen, - tags: tagsBuffer.Bytes(), - vdata: vdata.Bytes(), + digestLen: digestLen, + tagLen: tagLen, + tags: tagsBuffer.Bytes(), + vdata: vdata.Bytes(), }, nil } -func (c *layersCache) readMetadataFromCache(id string) (*metadata, error) { - bigData, err := c.store.LayerBigData(id, cacheKey) - if err != nil { - if errors.Cause(err) == os.ErrNotExist { - return nil, nil - } - return nil, err - } - defer bigData.Close() - +func readMetadataFromCache(bigData io.Reader) (*metadata, error) { var version, tagLen, digestLen, tagsLen, vdataLen uint64 if err := binary.Read(bigData, binary.LittleEndian, &version); err != nil { return nil, err @@ -370,7 +386,7 @@ func (c *layersCache) readMetadataFromCache(id string) (*metadata, error) { } vdata := make([]byte, vdataLen) - if _, err = bigData.Read(vdata); err != nil { + if _, err := bigData.Read(vdata); err != nil { return nil, err } @@ -382,17 +398,7 @@ func (c *layersCache) readMetadataFromCache(id string) (*metadata, error) { }, nil } -func (c *layersCache) prepareMetadata(id string) ([]*internal.FileMetadata, error) { - manifestReader, err := c.store.LayerBigData(id, bigDataKey) - if err != nil { - return nil, nil - } - defer manifestReader.Close() - manifest, err := ioutil.ReadAll(manifestReader) - if err != nil { - return nil, fmt.Errorf("open manifest file for layer %q: %w", id, err) - } - +func prepareMetadata(manifest []byte) ([]*internal.FileMetadata, error) { toc, err := unmarshalToc(manifest) if err != nil { // ignore errors here. They might be caused by a different manifest format. @@ -405,6 +411,7 @@ func (c *layersCache) prepareMetadata(id string) ([]*internal.FileMetadata, erro d := toc.Entries[i].Digest if d != "" { r = append(r, &toc.Entries[i]) + continue } // chunks do not use hard link dedup so keeping just one candidate is enough @@ -473,7 +480,7 @@ func (c *layersCache) findDigestInternal(digest string) (string, string, int64, if digest != "" { position := string(layer.metadata.vdata[off : off+len]) parts := strings.SplitN(position, "@", 2) - offFile, _ := strconv.ParseInt(parts[1], 10, 64) + offFile, _ := strconv.ParseInt(parts[0], 10, 64) return layer.target, parts[1], offFile, nil } } @@ -517,7 +524,7 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { for iter.ReadArray() { for field := iter.ReadObject(); field != ""; field = iter.ReadObject() { switch field { - case "type", "name", "linkName", "digest", "chunkDigest": + case "type", "name", "linkName", "digest", "chunkDigest", "chunkType": count += len(iter.ReadStringAsSlice()) case "xattrs": for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { @@ -602,6 +609,8 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { m.ChunkOffset = iter.ReadInt64() case "chunkDigest": m.ChunkDigest = getString(iter.ReadStringAsSlice()) + case "chunkType": + m.ChunkType = getString(iter.ReadStringAsSlice()) case "xattrs": m.Xattrs = make(map[string]string) for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { diff --git a/pkg/chunked/cache_linux_test.go b/pkg/chunked/cache_linux_test.go new file mode 100644 index 0000000000..e562837d26 --- /dev/null +++ b/pkg/chunked/cache_linux_test.go @@ -0,0 +1,171 @@ +package chunked + +import ( + "bytes" + "fmt" + "io" + "reflect" + "testing" +) + +const jsonTOC = ` +{ + "version": 1, + "entries": [ + { + "type": "symlink", + "name": "bin", + "linkName": "usr/bin", + "mode": 511, + "modtime": "1970-01-01T01:00:00+01:00", + "accesstime": "0001-01-01T00:00:00Z", + "changetime": "0001-01-01T00:00:00Z" + }, + { + "type": "dir", + "name": "usr/bin", + "mode": 511, + "modtime": "2022-01-07T12:36:43+01:00", + "accesstime": "0001-01-01T00:00:00Z", + "changetime": "0001-01-01T00:00:00Z" + }, + { + "type": "reg", + "name": "usr/bin/foo", + "mode": 511, + "size": 103867, + "modtime": "1970-01-01T01:00:00+01:00", + "accesstime": "0001-01-01T00:00:00Z", + "changetime": "0001-01-01T00:00:00Z", + "digest": "sha256:99fe908c699dc068438b23e28319cadff1f2153c3043bafb8e83a430bba0a2c6", + "offset": 94149, + "endOffset": 120135, + "chunkSize": 17615, + "chunkDigest": "sha256:2ce0d0f8eb2aa93d13007097763e4459c814c8d0e859e5a57465af924169b544" + }, + { + "type": "chunk", + "name": "usr/bin/foo", + "offset": 99939, + "chunkSize": 86252, + "chunkOffset": 17615, + "chunkDigest": "sha256:2a9d3f1b6b37abc8bb35eb8fa98b893a2a2447bcb01184c3bafc8c6b40da099d" + } +} +` + +func TestPrepareMetadata(t *testing.T) { + toc, err := prepareMetadata([]byte(jsonTOC)) + if err != nil { + t.Errorf("got error from prepareMetadata: %w", err) + } + if len(toc) != 2 { + t.Error("prepareMetadata returns the wrong length") + } +} + +type bigDataToBuffer struct { + buf *bytes.Buffer + id string + key string + called bool +} + +func (b *bigDataToBuffer) SetLayerBigData(id, key string, data io.Reader) error { + b.id = id + b.key = key + if b.called { + return fmt.Errorf("SetLayerBigData already called once") + } + b.called = true + _, err := io.Copy(b.buf, data) + return err +} + +func TestWriteCache(t *testing.T) { + toc, err := prepareMetadata([]byte(jsonTOC)) + if err != nil { + t.Errorf("got error from prepareMetadata: %w", err) + } + + dest := bigDataToBuffer{ + buf: bytes.NewBuffer(nil), + } + cache, err := writeCache([]byte(jsonTOC), "foobar", &dest) + if err != nil { + t.Errorf("got error from writeCache: %w", err) + } + if digest, _, _ := findTag("foobar", cache); digest != "" { + t.Error("found invalid tag") + } + + for _, r := range toc { + if r.Digest != "" { + // find the element in the cache by the digest checksum + digest, off, len := findTag(r.Digest, cache) + if digest == "" { + t.Error("file tag not found") + } + if digest != r.Digest { + t.Error("wrong file found") + } + expectedLocation := generateFileLocation(r.Name, 0) + location := cache.vdata[off : off+len] + if !bytes.Equal(location, expectedLocation) { + t.Errorf("wrong file found %q instead of %q", location, expectedLocation) + } + + fingerprint, err := calculateHardLinkFingerprint(r) + if err != nil { + t.Errorf("got error from writeCache: %w", err) + } + + // find the element in the cache by the hardlink fingerprint + digest, off, len = findTag(fingerprint, cache) + if digest == "" { + t.Error("file tag not found") + } + if digest != fingerprint { + t.Error("wrong file found") + } + expectedLocation = generateFileLocation(r.Name, 0) + location = cache.vdata[off : off+len] + if !bytes.Equal(location, expectedLocation) { + t.Errorf("wrong file found %q instead of %q", location, expectedLocation) + } + } + if r.ChunkDigest != "" { + // find the element in the cache by the chunk digest checksum + digest, off, len := findTag(r.ChunkDigest, cache) + if digest == "" { + t.Error("chunk tag not found") + } + if digest != r.ChunkDigest { + t.Error("wrong digest found") + } + expectedLocation := generateFileLocation(r.Name, uint64(r.ChunkOffset)) + location := cache.vdata[off : off+len] + if !bytes.Equal(location, expectedLocation) { + t.Errorf("wrong file found %q instead of %q", location, expectedLocation) + } + } + } +} + +func TestReadCache(t *testing.T) { + dest := bigDataToBuffer{ + buf: bytes.NewBuffer(nil), + } + cache, err := writeCache([]byte(jsonTOC), "foobar", &dest) + if err != nil { + t.Errorf("got error from writeCache: %w", err) + } + + cacheRead, err := readMetadataFromCache(dest.buf) + if err != nil { + t.Errorf("got error from readMetadataFromCache: %w", err) + } + if !reflect.DeepEqual(cache, cacheRead) { + t.Errorf("read a different struct than what was written") + } +} diff --git a/pkg/chunked/chunked b/pkg/chunked/chunked deleted file mode 120000 index ce52bb9935..0000000000 --- a/pkg/chunked/chunked +++ /dev/null @@ -1 +0,0 @@ -/home/gscrivano/src/gopath/src/github.com/containers/storage/pkg/chunked \ No newline at end of file diff --git a/pkg/chunked/compressor/compressor.go b/pkg/chunked/compressor/compressor.go index a2035a8b69..7893cf9aab 100644 --- a/pkg/chunked/compressor/compressor.go +++ b/pkg/chunked/compressor/compressor.go @@ -17,21 +17,152 @@ import ( ) const RollsumBits = 16 +const holesThreshold = int64(1 << 10) + +type holesFinder struct { + reader *bufio.Reader + fileOff int64 + zeros int64 + from int64 + threshold int64 + + state int +} + +const ( + holesFinderStateRead = iota + holesFinderStateAccumulate + holesFinderStateFound + holesFinderStateEOF +) + +// ReadByte reads a single byte from the underlying reader. +// If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil). +// If there are at least f.THRESHOLD consecutive zeros, then the +// return value is (N_CONSECUTIVE_ZEROS, '\x00'). +func (f *holesFinder) ReadByte() (int64, byte, error) { + for { + switch f.state { + // reading the file stream + case holesFinderStateRead: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + b, err := f.reader.ReadByte() + if err != nil { + return 0, b, err + } + + if b != 0 { + return 0, b, err + } + + f.zeros = 1 + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } else { + f.state = holesFinderStateAccumulate + } + // accumulating zeros, but still didn't reach the threshold + case holesFinderStateAccumulate: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + continue + } + return 0, b, err + } + + if b == 0 { + f.zeros++ + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } + } else { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + } + // found a hole. Number of zeros >= threshold + case holesFinderStateFound: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + } + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + if b != 0 { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + f.zeros++ + // reached EOF. Flush pending zeros if any. + case holesFinderStateEOF: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + return 0, 0, io.EOF + } + } +} type rollingChecksumReader struct { - reader *bufio.Reader - closed bool - rollsum *RollSum + reader *holesFinder + closed bool + rollsum *RollSum + pendingHole int64 + // WrittenOut is the total number of bytes read from + // the stream. WrittenOut int64 + + // IsLastChunkZeros tells whether the last generated + // chunk is a hole (made of consecutive zeros). If it + // is false, then the last chunk is a data chunk + // generated by the rolling checksum. + IsLastChunkZeros bool } func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { + rc.IsLastChunkZeros = false + + if rc.pendingHole > 0 { + toCopy := int64(len(b)) + if rc.pendingHole < toCopy { + toCopy = rc.pendingHole + } + rc.pendingHole -= toCopy + for i := int64(0); i < toCopy; i++ { + b[i] = 0 + } + + rc.WrittenOut += toCopy + + rc.IsLastChunkZeros = true + + // if there are no other zeros left, terminate the chunk + return rc.pendingHole == 0, int(toCopy), nil + } + if rc.closed { return false, 0, io.EOF } + for i := 0; i < len(b); i++ { - n, err := rc.reader.ReadByte() + holeLen, n, err := rc.reader.ReadByte() if err != nil { if err == io.EOF { rc.closed = true @@ -43,6 +174,13 @@ func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { // Report any other error type return false, -1, err } + if holeLen > 0 { + for j := int64(0); j < holeLen; j++ { + rc.rollsum.Roll(0) + } + rc.pendingHole = holeLen + return true, i, nil + } b[i] = n rc.WrittenOut++ rc.rollsum.Roll(n) @@ -58,6 +196,7 @@ type chunk struct { Offset int64 Checksum string ChunkSize int64 + ChunkType string } func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error { @@ -119,8 +258,13 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r chunks := []chunk{} + hf := &holesFinder{ + threshold: holesThreshold, + reader: bufio.NewReader(tr), + } + rcReader := &rollingChecksumReader{ - reader: bufio.NewReader(tr), + reader: hf, rollsum: NewRollSum(), } @@ -150,12 +294,21 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r return err } - chunks = append(chunks, chunk{ - ChunkOffset: lastChunkOffset, - Offset: lastOffset, - Checksum: chunkDigester.Digest().String(), - ChunkSize: rcReader.WrittenOut - lastChunkOffset, - }) + chunkSize := rcReader.WrittenOut - lastChunkOffset + if chunkSize > 0 { + chunkType := internal.ChunkTypeData + if rcReader.IsLastChunkZeros { + chunkType = internal.ChunkTypeZeros + } + + chunks = append(chunks, chunk{ + ChunkOffset: lastChunkOffset, + Offset: lastOffset, + Checksum: chunkDigester.Digest().String(), + ChunkSize: chunkSize, + ChunkType: chunkType, + }) + } lastOffset = off lastChunkOffset = rcReader.WrittenOut @@ -210,6 +363,7 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r entries[i].ChunkSize = chunks[i].ChunkSize entries[i].Offset = chunks[i].Offset entries[i].ChunkDigest = chunks[i].Checksum + entries[i].ChunkType = chunks[i].ChunkType } } metadata = append(metadata, entries...) diff --git a/pkg/chunked/compressor/compressor_test.go b/pkg/chunked/compressor/compressor_test.go new file mode 100644 index 0000000000..f8f69fca9c --- /dev/null +++ b/pkg/chunked/compressor/compressor_test.go @@ -0,0 +1,90 @@ +package compressor + +import ( + "bufio" + "bytes" + "io" + "testing" +) + +func TestHole(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 1, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Errorf("EOF not found") + } + + hf = &holesFinder{ + threshold: 1000, + reader: bufio.NewReader(bytes.NewReader(data)), + } + for i := 0; i < 5; i++ { + hole, byte, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if byte != 0 { + t.Error("wrong read") + } + } + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} + +func TestTwoHoles(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00FOO\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 2, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("hole not found") + } + + for _, e := range []byte("FOO") { + hole, c, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if c != e { + t.Errorf("wrong byte read %v instead of %v", c, e) + } + } + hole, _, err = hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} diff --git a/pkg/chunked/internal/compression.go b/pkg/chunked/internal/compression.go index 01007dd966..1fce3e6f93 100644 --- a/pkg/chunked/internal/compression.go +++ b/pkg/chunked/internal/compression.go @@ -46,11 +46,17 @@ type FileMetadata struct { ChunkSize int64 `json:"chunkSize,omitempty"` ChunkOffset int64 `json:"chunkOffset,omitempty"` ChunkDigest string `json:"chunkDigest,omitempty"` + ChunkType string `json:"chunkType,omitempty"` // internal: computed by mergeTOCEntries. Chunks []*FileMetadata `json:"-"` } +const ( + ChunkTypeData = "" + ChunkTypeZeros = "zeros" +) + const ( TypeReg = "reg" TypeChunk = "chunk" diff --git a/pkg/chunked/storage_linux.go b/pkg/chunked/storage_linux.go index 2ce77ad2f8..43a97ad46f 100644 --- a/pkg/chunked/storage_linux.go +++ b/pkg/chunked/storage_linux.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "fmt" + "hash" "io" "io/ioutil" "os" @@ -42,9 +43,10 @@ const ( containersOverrideXattr = "user.containers.override_stat" bigDataKey = "zstd-chunked-manifest" - fileTypeZstdChunked = iota - fileTypeEstargz = iota - fileTypeNoCompression = iota + fileTypeZstdChunked = iota + fileTypeEstargz + fileTypeNoCompression + fileTypeHole copyGoRoutines = 32 ) @@ -438,14 +440,14 @@ func maybeDoIDRemap(manifest []internal.FileMetadata, options *archive.TarOption } type originFile struct { - Root string - Path string - + Root string + Path string Offset int64 } type missingFileChunk struct { - Gap int64 + Gap int64 + Hole bool File *internal.FileMetadata @@ -454,6 +456,7 @@ type missingFileChunk struct { } type missingPart struct { + Hole bool SourceChunk *ImageSourceChunk OriginFile *originFile Chunks []missingFileChunk @@ -722,9 +725,19 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil return nil, err } -func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error { - switch compression { - case fileTypeZstdChunked: +func (c *chunkedDiffer) prepareCompressedStreamToFile(partCompression compressedFileType, from io.Reader, mf *missingFileChunk) (compressedFileType, error) { + switch { + case partCompression == fileTypeHole: + // The entire part is a hole. Do not need to read from a file. + c.rawReader = nil + return fileTypeHole, nil + case mf.Hole: + // Only the missing chunk in the requested part refers to a hole. + // The received data must be discarded. + limitReader := io.LimitReader(from, mf.CompressedSize) + _, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer) + return fileTypeHole, err + case partCompression == fileTypeZstdChunked: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.zstdReader == nil { r := zstd.NewReader(c.rawReader) @@ -732,42 +745,83 @@ func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFile } else { c.zstdReader.Reset(c.rawReader, nil) } - case fileTypeEstargz: + case partCompression == fileTypeEstargz: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.gzipReader == nil { r, err := pgzip.NewReader(c.rawReader) if err != nil { - return err + return partCompression, err } c.gzipReader = r } else { if err := c.gzipReader.Reset(c.rawReader); err != nil { - return err + return partCompression, err } } - - case fileTypeNoCompression: + case partCompression == fileTypeNoCompression: c.rawReader = io.LimitReader(from, mf.UncompressedSize) default: - return fmt.Errorf("unknown file type %q", c.fileType) + return partCompression, fmt.Errorf("unknown file type %q", c.fileType) + } + return partCompression, nil +} + +// hashHole writes SIZE zeros to the specified hasher +func hashHole(h hash.Hash, size int64, copyBuffer []byte) error { + count := int64(len(copyBuffer)) + if size < count { + count = size + } + for i := int64(0); i < count; i++ { + copyBuffer[i] = 0 + } + for size > 0 { + count = int64(len(copyBuffer)) + if size < count { + count = size + } + if _, err := h.Write(copyBuffer[:count]); err != nil { + return err + } + size -= count + } + return nil +} + +// appendHole creates a hole with the specified size at the open fd. +func appendHole(fd int, size int64) error { + off, err := unix.Seek(fd, size, unix.SEEK_CUR) + if err != nil { + return err + } + // Make sure the file size is changed. It might be the last hole and no other data written afterwards. + if err := unix.Ftruncate(fd, off); err != nil { + return err } return nil } -func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error { +func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, size int64) error { switch compression { case fileTypeZstdChunked: defer c.zstdReader.Reset(nil, nil) - if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { return err } case fileTypeEstargz: defer c.gzipReader.Close() - if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { return err } case fileTypeNoCompression: - if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + return err + } + case fileTypeHole: + if err := appendHole(int(destFile.file.Fd()), size); err != nil { + return err + } + if err := hashHole(destFile.hash, size, c.copyBuffer); err != nil { return err } default: @@ -780,6 +834,7 @@ type destinationFile struct { dirfd int file *os.File digester digest.Digester + hash hash.Hash to io.Writer metadata *internal.FileMetadata options *archive.TarOptions @@ -792,11 +847,13 @@ func openDestinationFile(dirfd int, metadata *internal.FileMetadata, options *ar } digester := digest.Canonical.Digester() - to := io.MultiWriter(file, digester.Hash()) + hash := digester.Hash() + to := io.MultiWriter(file, hash) return &destinationFile{ file: file, digester: digester, + hash: hash, to: to, metadata: metadata, options: options, @@ -841,15 +898,17 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan for _, missingPart := range missingParts { var part io.ReadCloser - compression := c.fileType + partCompression := c.fileType switch { + case missingPart.Hole: + partCompression = fileTypeHole case missingPart.OriginFile != nil: var err error part, err = missingPart.OriginFile.OpenFile() if err != nil { return err } - compression = fileTypeNoCompression + partCompression = fileTypeNoCompression case missingPart.SourceChunk != nil: select { case p := <-streams: @@ -880,7 +939,8 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan goto exit } - if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil { + compression, err := c.prepareCompressedStreamToFile(partCompression, part, &mf) + if err != nil { Err = err goto exit } @@ -911,19 +971,23 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan } } - if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil { + if err := c.appendCompressedStreamToFile(compression, destFile, mf.UncompressedSize); err != nil { Err = err goto exit } - if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { - Err = err - goto exit + if c.rawReader != nil { + if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { + Err = err + goto exit + } } } exit: - part.Close() - if Err != nil { - break + if part != nil { + part.Close() + if Err != nil { + break + } } } @@ -957,6 +1021,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) if gap == 0 && missingParts[prevIndex].OriginFile == nil && missingParts[i].OriginFile == nil && + !missingParts[prevIndex].Hole && !missingParts[i].Hole && len(missingParts[prevIndex].Chunks) == 1 && len(missingParts[i].Chunks) == 1 && missingParts[prevIndex].Chunks[0].File.Name == missingParts[i].Chunks[0].File.Name { missingParts[prevIndex].SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length @@ -983,6 +1048,9 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { sort.Ints(costs) toShrink := len(missingParts) - target + if toShrink >= len(costs) { + toShrink = len(costs) - 1 + } targetValue := costs[toShrink] newMissingParts = missingParts[0:1] @@ -993,6 +1061,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) prev := &newMissingParts[len(newMissingParts)-1] prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length + prev.Hole = false prev.OriginFile = nil if gap > 0 { gapFile := missingFileChunk{ @@ -1009,7 +1078,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error { var chunksToRequest []ImageSourceChunk for _, c := range missingParts { - if c.OriginFile == nil { + if c.OriginFile == nil && !c.Hole { chunksToRequest = append(chunksToRequest, *c.SourceChunk) } } @@ -1542,16 +1611,26 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra }, } - root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) - if err != nil { - return output, err - } - if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + switch chunk.ChunkType { + case internal.ChunkTypeData: + root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) + if err != nil { + return output, err + } + if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + missingPartsSize -= size + mp.OriginFile = &originFile{ + Root: root, + Path: path, + Offset: offset, + } + } + case internal.ChunkTypeZeros: missingPartsSize -= size - mp.OriginFile = &originFile{ - Root: root, - Path: path, - Offset: offset, + mp.Hole = true + // Mark all chunks belonging to the missing part as holes + for i := range mp.Chunks { + mp.Chunks[i].Hole = true } } missingParts = append(missingParts, mp)