From 70c783b293dee61f315634a7c3ce812dee9619d8 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 13 Jan 2022 12:15:03 +0100 Subject: [PATCH 1/5] chunked: remove garbage symlink Signed-off-by: Giuseppe Scrivano --- pkg/chunked/chunked | 1 - 1 file changed, 1 deletion(-) delete mode 120000 pkg/chunked/chunked diff --git a/pkg/chunked/chunked b/pkg/chunked/chunked deleted file mode 120000 index ce52bb9935..0000000000 --- a/pkg/chunked/chunked +++ /dev/null @@ -1 +0,0 @@ -/home/gscrivano/src/gopath/src/github.com/containers/storage/pkg/chunked \ No newline at end of file From 96c0403bb1aaf342d73406250499e9352f2e8b25 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 13 Jan 2022 10:40:33 +0100 Subject: [PATCH 2/5] cache: store correctly the digestLen field commit 10697a05a27fa82db0552d83a4bee5a3703f974f introduced the issue. Signed-off-by: Giuseppe Scrivano --- pkg/chunked/cache_linux.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 69359b5529..95f1ebe261 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -328,9 +328,10 @@ func (c *layersCache) writeCache(id string) (*metadata, error) { logrus.Debugf("Written lookaside cache for layer %q with length %v", id, counter.Count) return &metadata{ - tagLen: tagLen, - tags: tagsBuffer.Bytes(), - vdata: vdata.Bytes(), + digestLen: digestLen, + tagLen: tagLen, + tags: tagsBuffer.Bytes(), + vdata: vdata.Bytes(), }, nil } From ab25eafc1706dbc45c797cc48d65953aee0b4e83 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 13 Jan 2022 10:54:46 +0100 Subject: [PATCH 3/5] cache: parse the correct field for offset commit 10697a05a27fa82db0552d83a4bee5a3703f974f introduced the issue. Signed-off-by: Giuseppe Scrivano --- pkg/chunked/cache_linux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 95f1ebe261..1160fdb1a9 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -474,7 +474,7 @@ func (c *layersCache) findDigestInternal(digest string) (string, string, int64, if digest != "" { position := string(layer.metadata.vdata[off : off+len]) parts := strings.SplitN(position, "@", 2) - offFile, _ := strconv.ParseInt(parts[1], 10, 64) + offFile, _ := strconv.ParseInt(parts[0], 10, 64) return layer.target, parts[1], offFile, nil } } From fd89b93ef34c1f534faab419177719a79313ba93 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 13 Jan 2022 12:09:53 +0100 Subject: [PATCH 4/5] chunked: add tests for the cache Signed-off-by: Giuseppe Scrivano --- pkg/chunked/cache_linux.go | 60 ++++++----- pkg/chunked/cache_linux_test.go | 171 ++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 27 deletions(-) create mode 100644 pkg/chunked/cache_linux_test.go diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 1160fdb1a9..53cc38b52f 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -107,7 +107,16 @@ func (c *layersCache) load() error { continue } - metadata, err := c.readMetadataFromCache(r.ID) + bigData, err := c.store.LayerBigData(r.ID, cacheKey) + if err != nil { + if errors.Cause(err) == os.ErrNotExist { + continue + } + return err + } + defer bigData.Close() + + metadata, err := readMetadataFromCache(bigData) if err != nil { logrus.Warningf("Error reading cache file for layer %q: %v", r.ID, err) } @@ -117,7 +126,17 @@ func (c *layersCache) load() error { continue } - metadata, err = c.writeCache(r.ID) + manifestReader, err := c.store.LayerBigData(r.ID, bigDataKey) + if err != nil { + continue + } + defer manifestReader.Close() + manifest, err := ioutil.ReadAll(manifestReader) + if err != nil { + return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err) + } + + metadata, err = writeCache(manifest, r.ID, c.store) if err == nil { c.addLayer(r.ID, metadata) } @@ -182,6 +201,11 @@ func generateTag(digest string, offset, len uint64) string { return fmt.Sprintf("%s%.20d@%.20d", digest, offset, len) } +type setBigData interface { + // SetLayerBigData stores a (possibly large) chunk of named data + SetLayerBigData(id, key string, data io.Reader) error +} + // writeCache write a cache for the layer ID. // It generates a sorted list of digests with their offset to the path location and offset. // The same cache is used to lookup files, chunks and candidates for deduplication with hard links. @@ -189,13 +213,13 @@ func generateTag(digest string, offset, len uint64) string { // - digest(file.payload)) // - digest(digest(file.payload) + file.UID + file.GID + file.mode + file.xattrs) // - digest(i) for each i in chunks(file payload) -func (c *layersCache) writeCache(id string) (*metadata, error) { +func writeCache(manifest []byte, id string, dest setBigData) (*metadata, error) { var vdata bytes.Buffer tagLen := 0 digestLen := 0 var tagsBuffer bytes.Buffer - toc, err := c.prepareMetadata(id) + toc, err := prepareMetadata(manifest) if err != nil { return nil, err } @@ -317,7 +341,7 @@ func (c *layersCache) writeCache(id string) (*metadata, error) { r := io.TeeReader(pipeReader, counter) - if err := c.store.SetLayerBigData(id, cacheKey, r); err != nil { + if err := dest.SetLayerBigData(id, cacheKey, r); err != nil { return nil, err } @@ -335,16 +359,7 @@ func (c *layersCache) writeCache(id string) (*metadata, error) { }, nil } -func (c *layersCache) readMetadataFromCache(id string) (*metadata, error) { - bigData, err := c.store.LayerBigData(id, cacheKey) - if err != nil { - if errors.Cause(err) == os.ErrNotExist { - return nil, nil - } - return nil, err - } - defer bigData.Close() - +func readMetadataFromCache(bigData io.Reader) (*metadata, error) { var version, tagLen, digestLen, tagsLen, vdataLen uint64 if err := binary.Read(bigData, binary.LittleEndian, &version); err != nil { return nil, err @@ -371,7 +386,7 @@ func (c *layersCache) readMetadataFromCache(id string) (*metadata, error) { } vdata := make([]byte, vdataLen) - if _, err = bigData.Read(vdata); err != nil { + if _, err := bigData.Read(vdata); err != nil { return nil, err } @@ -383,17 +398,7 @@ func (c *layersCache) readMetadataFromCache(id string) (*metadata, error) { }, nil } -func (c *layersCache) prepareMetadata(id string) ([]*internal.FileMetadata, error) { - manifestReader, err := c.store.LayerBigData(id, bigDataKey) - if err != nil { - return nil, nil - } - defer manifestReader.Close() - manifest, err := ioutil.ReadAll(manifestReader) - if err != nil { - return nil, fmt.Errorf("open manifest file for layer %q: %w", id, err) - } - +func prepareMetadata(manifest []byte) ([]*internal.FileMetadata, error) { toc, err := unmarshalToc(manifest) if err != nil { // ignore errors here. They might be caused by a different manifest format. @@ -406,6 +411,7 @@ func (c *layersCache) prepareMetadata(id string) ([]*internal.FileMetadata, erro d := toc.Entries[i].Digest if d != "" { r = append(r, &toc.Entries[i]) + continue } // chunks do not use hard link dedup so keeping just one candidate is enough diff --git a/pkg/chunked/cache_linux_test.go b/pkg/chunked/cache_linux_test.go new file mode 100644 index 0000000000..e562837d26 --- /dev/null +++ b/pkg/chunked/cache_linux_test.go @@ -0,0 +1,171 @@ +package chunked + +import ( + "bytes" + "fmt" + "io" + "reflect" + "testing" +) + +const jsonTOC = ` +{ + "version": 1, + "entries": [ + { + "type": "symlink", + "name": "bin", + "linkName": "usr/bin", + "mode": 511, + "modtime": "1970-01-01T01:00:00+01:00", + "accesstime": "0001-01-01T00:00:00Z", + "changetime": "0001-01-01T00:00:00Z" + }, + { + "type": "dir", + "name": "usr/bin", + "mode": 511, + "modtime": "2022-01-07T12:36:43+01:00", + "accesstime": "0001-01-01T00:00:00Z", + "changetime": "0001-01-01T00:00:00Z" + }, + { + "type": "reg", + "name": "usr/bin/foo", + "mode": 511, + "size": 103867, + "modtime": "1970-01-01T01:00:00+01:00", + "accesstime": "0001-01-01T00:00:00Z", + "changetime": "0001-01-01T00:00:00Z", + "digest": "sha256:99fe908c699dc068438b23e28319cadff1f2153c3043bafb8e83a430bba0a2c6", + "offset": 94149, + "endOffset": 120135, + "chunkSize": 17615, + "chunkDigest": "sha256:2ce0d0f8eb2aa93d13007097763e4459c814c8d0e859e5a57465af924169b544" + }, + { + "type": "chunk", + "name": "usr/bin/foo", + "offset": 99939, + "chunkSize": 86252, + "chunkOffset": 17615, + "chunkDigest": "sha256:2a9d3f1b6b37abc8bb35eb8fa98b893a2a2447bcb01184c3bafc8c6b40da099d" + } +} +` + +func TestPrepareMetadata(t *testing.T) { + toc, err := prepareMetadata([]byte(jsonTOC)) + if err != nil { + t.Errorf("got error from prepareMetadata: %w", err) + } + if len(toc) != 2 { + t.Error("prepareMetadata returns the wrong length") + } +} + +type bigDataToBuffer struct { + buf *bytes.Buffer + id string + key string + called bool +} + +func (b *bigDataToBuffer) SetLayerBigData(id, key string, data io.Reader) error { + b.id = id + b.key = key + if b.called { + return fmt.Errorf("SetLayerBigData already called once") + } + b.called = true + _, err := io.Copy(b.buf, data) + return err +} + +func TestWriteCache(t *testing.T) { + toc, err := prepareMetadata([]byte(jsonTOC)) + if err != nil { + t.Errorf("got error from prepareMetadata: %w", err) + } + + dest := bigDataToBuffer{ + buf: bytes.NewBuffer(nil), + } + cache, err := writeCache([]byte(jsonTOC), "foobar", &dest) + if err != nil { + t.Errorf("got error from writeCache: %w", err) + } + if digest, _, _ := findTag("foobar", cache); digest != "" { + t.Error("found invalid tag") + } + + for _, r := range toc { + if r.Digest != "" { + // find the element in the cache by the digest checksum + digest, off, len := findTag(r.Digest, cache) + if digest == "" { + t.Error("file tag not found") + } + if digest != r.Digest { + t.Error("wrong file found") + } + expectedLocation := generateFileLocation(r.Name, 0) + location := cache.vdata[off : off+len] + if !bytes.Equal(location, expectedLocation) { + t.Errorf("wrong file found %q instead of %q", location, expectedLocation) + } + + fingerprint, err := calculateHardLinkFingerprint(r) + if err != nil { + t.Errorf("got error from writeCache: %w", err) + } + + // find the element in the cache by the hardlink fingerprint + digest, off, len = findTag(fingerprint, cache) + if digest == "" { + t.Error("file tag not found") + } + if digest != fingerprint { + t.Error("wrong file found") + } + expectedLocation = generateFileLocation(r.Name, 0) + location = cache.vdata[off : off+len] + if !bytes.Equal(location, expectedLocation) { + t.Errorf("wrong file found %q instead of %q", location, expectedLocation) + } + } + if r.ChunkDigest != "" { + // find the element in the cache by the chunk digest checksum + digest, off, len := findTag(r.ChunkDigest, cache) + if digest == "" { + t.Error("chunk tag not found") + } + if digest != r.ChunkDigest { + t.Error("wrong digest found") + } + expectedLocation := generateFileLocation(r.Name, uint64(r.ChunkOffset)) + location := cache.vdata[off : off+len] + if !bytes.Equal(location, expectedLocation) { + t.Errorf("wrong file found %q instead of %q", location, expectedLocation) + } + } + } +} + +func TestReadCache(t *testing.T) { + dest := bigDataToBuffer{ + buf: bytes.NewBuffer(nil), + } + cache, err := writeCache([]byte(jsonTOC), "foobar", &dest) + if err != nil { + t.Errorf("got error from writeCache: %w", err) + } + + cacheRead, err := readMetadataFromCache(dest.buf) + if err != nil { + t.Errorf("got error from readMetadataFromCache: %w", err) + } + if !reflect.DeepEqual(cache, cacheRead) { + t.Errorf("read a different struct than what was written") + } +} From 198820877c3476af5bf6654af1222b5e87267512 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 11 Jan 2022 18:32:43 +0100 Subject: [PATCH 5/5] pkg/chunked: add support for sparse files automatically detect holes in sparse files (the threshold is hardcoded at 1kb for now) and add this information to the manifest file. The receiver will create a hole (using unix.Seek and unix.Ftruncate) instead of writing the actual zeros. Closes: https://github.com/containers/storage/issues/1091 Signed-off-by: Giuseppe Scrivano --- pkg/chunked/cache_linux.go | 4 +- pkg/chunked/compressor/compressor.go | 176 ++++++++++++++++++++-- pkg/chunked/compressor/compressor_test.go | 90 +++++++++++ pkg/chunked/internal/compression.go | 6 + pkg/chunked/storage_linux.go | 161 +++++++++++++++----- 5 files changed, 384 insertions(+), 53 deletions(-) create mode 100644 pkg/chunked/compressor/compressor_test.go diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index 53cc38b52f..a931fb5d13 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -524,7 +524,7 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { for iter.ReadArray() { for field := iter.ReadObject(); field != ""; field = iter.ReadObject() { switch field { - case "type", "name", "linkName", "digest", "chunkDigest": + case "type", "name", "linkName", "digest", "chunkDigest", "chunkType": count += len(iter.ReadStringAsSlice()) case "xattrs": for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { @@ -609,6 +609,8 @@ func unmarshalToc(manifest []byte) (*internal.TOC, error) { m.ChunkOffset = iter.ReadInt64() case "chunkDigest": m.ChunkDigest = getString(iter.ReadStringAsSlice()) + case "chunkType": + m.ChunkType = getString(iter.ReadStringAsSlice()) case "xattrs": m.Xattrs = make(map[string]string) for key := iter.ReadObject(); key != ""; key = iter.ReadObject() { diff --git a/pkg/chunked/compressor/compressor.go b/pkg/chunked/compressor/compressor.go index a2035a8b69..7893cf9aab 100644 --- a/pkg/chunked/compressor/compressor.go +++ b/pkg/chunked/compressor/compressor.go @@ -17,21 +17,152 @@ import ( ) const RollsumBits = 16 +const holesThreshold = int64(1 << 10) + +type holesFinder struct { + reader *bufio.Reader + fileOff int64 + zeros int64 + from int64 + threshold int64 + + state int +} + +const ( + holesFinderStateRead = iota + holesFinderStateAccumulate + holesFinderStateFound + holesFinderStateEOF +) + +// ReadByte reads a single byte from the underlying reader. +// If a single byte is read, the return value is (0, RAW-BYTE-VALUE, nil). +// If there are at least f.THRESHOLD consecutive zeros, then the +// return value is (N_CONSECUTIVE_ZEROS, '\x00'). +func (f *holesFinder) ReadByte() (int64, byte, error) { + for { + switch f.state { + // reading the file stream + case holesFinderStateRead: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + b, err := f.reader.ReadByte() + if err != nil { + return 0, b, err + } + + if b != 0 { + return 0, b, err + } + + f.zeros = 1 + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } else { + f.state = holesFinderStateAccumulate + } + // accumulating zeros, but still didn't reach the threshold + case holesFinderStateAccumulate: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + continue + } + return 0, b, err + } + + if b == 0 { + f.zeros++ + if f.zeros == f.threshold { + f.state = holesFinderStateFound + } + } else { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + } + // found a hole. Number of zeros >= threshold + case holesFinderStateFound: + b, err := f.reader.ReadByte() + if err != nil { + if err == io.EOF { + f.state = holesFinderStateEOF + } + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + if b != 0 { + if f.reader.UnreadByte(); err != nil { + return 0, 0, err + } + f.state = holesFinderStateRead + + holeLen := f.zeros + f.zeros = 0 + return holeLen, 0, nil + } + f.zeros++ + // reached EOF. Flush pending zeros if any. + case holesFinderStateEOF: + if f.zeros > 0 { + f.zeros-- + return 0, 0, nil + } + return 0, 0, io.EOF + } + } +} type rollingChecksumReader struct { - reader *bufio.Reader - closed bool - rollsum *RollSum + reader *holesFinder + closed bool + rollsum *RollSum + pendingHole int64 + // WrittenOut is the total number of bytes read from + // the stream. WrittenOut int64 + + // IsLastChunkZeros tells whether the last generated + // chunk is a hole (made of consecutive zeros). If it + // is false, then the last chunk is a data chunk + // generated by the rolling checksum. + IsLastChunkZeros bool } func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { + rc.IsLastChunkZeros = false + + if rc.pendingHole > 0 { + toCopy := int64(len(b)) + if rc.pendingHole < toCopy { + toCopy = rc.pendingHole + } + rc.pendingHole -= toCopy + for i := int64(0); i < toCopy; i++ { + b[i] = 0 + } + + rc.WrittenOut += toCopy + + rc.IsLastChunkZeros = true + + // if there are no other zeros left, terminate the chunk + return rc.pendingHole == 0, int(toCopy), nil + } + if rc.closed { return false, 0, io.EOF } + for i := 0; i < len(b); i++ { - n, err := rc.reader.ReadByte() + holeLen, n, err := rc.reader.ReadByte() if err != nil { if err == io.EOF { rc.closed = true @@ -43,6 +174,13 @@ func (rc *rollingChecksumReader) Read(b []byte) (bool, int, error) { // Report any other error type return false, -1, err } + if holeLen > 0 { + for j := int64(0); j < holeLen; j++ { + rc.rollsum.Roll(0) + } + rc.pendingHole = holeLen + return true, i, nil + } b[i] = n rc.WrittenOut++ rc.rollsum.Roll(n) @@ -58,6 +196,7 @@ type chunk struct { Offset int64 Checksum string ChunkSize int64 + ChunkType string } func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error { @@ -119,8 +258,13 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r chunks := []chunk{} + hf := &holesFinder{ + threshold: holesThreshold, + reader: bufio.NewReader(tr), + } + rcReader := &rollingChecksumReader{ - reader: bufio.NewReader(tr), + reader: hf, rollsum: NewRollSum(), } @@ -150,12 +294,21 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r return err } - chunks = append(chunks, chunk{ - ChunkOffset: lastChunkOffset, - Offset: lastOffset, - Checksum: chunkDigester.Digest().String(), - ChunkSize: rcReader.WrittenOut - lastChunkOffset, - }) + chunkSize := rcReader.WrittenOut - lastChunkOffset + if chunkSize > 0 { + chunkType := internal.ChunkTypeData + if rcReader.IsLastChunkZeros { + chunkType = internal.ChunkTypeZeros + } + + chunks = append(chunks, chunk{ + ChunkOffset: lastChunkOffset, + Offset: lastOffset, + Checksum: chunkDigester.Digest().String(), + ChunkSize: chunkSize, + ChunkType: chunkType, + }) + } lastOffset = off lastChunkOffset = rcReader.WrittenOut @@ -210,6 +363,7 @@ func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, r entries[i].ChunkSize = chunks[i].ChunkSize entries[i].Offset = chunks[i].Offset entries[i].ChunkDigest = chunks[i].Checksum + entries[i].ChunkType = chunks[i].ChunkType } } metadata = append(metadata, entries...) diff --git a/pkg/chunked/compressor/compressor_test.go b/pkg/chunked/compressor/compressor_test.go new file mode 100644 index 0000000000..f8f69fca9c --- /dev/null +++ b/pkg/chunked/compressor/compressor_test.go @@ -0,0 +1,90 @@ +package compressor + +import ( + "bufio" + "bytes" + "io" + "testing" +) + +func TestHole(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 1, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Errorf("EOF not found") + } + + hf = &holesFinder{ + threshold: 1000, + reader: bufio.NewReader(bytes.NewReader(data)), + } + for i := 0; i < 5; i++ { + hole, byte, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if byte != 0 { + t.Error("wrong read") + } + } + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} + +func TestTwoHoles(t *testing.T) { + data := []byte("\x00\x00\x00\x00\x00FOO\x00\x00\x00\x00\x00") + + hf := &holesFinder{ + threshold: 2, + reader: bufio.NewReader(bytes.NewReader(data)), + } + + hole, _, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("hole not found") + } + + for _, e := range []byte("FOO") { + hole, c, err := hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 0 { + t.Error("hole found") + } + if c != e { + t.Errorf("wrong byte read %v instead of %v", c, e) + } + } + hole, _, err = hf.ReadByte() + if err != nil { + t.Errorf("got error: %w", err) + } + if hole != 5 { + t.Error("expected hole not found") + } + + if _, _, err := hf.ReadByte(); err != io.EOF { + t.Error("didn't receive EOF") + } +} diff --git a/pkg/chunked/internal/compression.go b/pkg/chunked/internal/compression.go index 01007dd966..1fce3e6f93 100644 --- a/pkg/chunked/internal/compression.go +++ b/pkg/chunked/internal/compression.go @@ -46,11 +46,17 @@ type FileMetadata struct { ChunkSize int64 `json:"chunkSize,omitempty"` ChunkOffset int64 `json:"chunkOffset,omitempty"` ChunkDigest string `json:"chunkDigest,omitempty"` + ChunkType string `json:"chunkType,omitempty"` // internal: computed by mergeTOCEntries. Chunks []*FileMetadata `json:"-"` } +const ( + ChunkTypeData = "" + ChunkTypeZeros = "zeros" +) + const ( TypeReg = "reg" TypeChunk = "chunk" diff --git a/pkg/chunked/storage_linux.go b/pkg/chunked/storage_linux.go index 2ce77ad2f8..43a97ad46f 100644 --- a/pkg/chunked/storage_linux.go +++ b/pkg/chunked/storage_linux.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "fmt" + "hash" "io" "io/ioutil" "os" @@ -42,9 +43,10 @@ const ( containersOverrideXattr = "user.containers.override_stat" bigDataKey = "zstd-chunked-manifest" - fileTypeZstdChunked = iota - fileTypeEstargz = iota - fileTypeNoCompression = iota + fileTypeZstdChunked = iota + fileTypeEstargz + fileTypeNoCompression + fileTypeHole copyGoRoutines = 32 ) @@ -438,14 +440,14 @@ func maybeDoIDRemap(manifest []internal.FileMetadata, options *archive.TarOption } type originFile struct { - Root string - Path string - + Root string + Path string Offset int64 } type missingFileChunk struct { - Gap int64 + Gap int64 + Hole bool File *internal.FileMetadata @@ -454,6 +456,7 @@ type missingFileChunk struct { } type missingPart struct { + Hole bool SourceChunk *ImageSourceChunk OriginFile *originFile Chunks []missingFileChunk @@ -722,9 +725,19 @@ func openOrCreateDirUnderRoot(name string, dirfd int, mode os.FileMode) (*os.Fil return nil, err } -func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFileType, from io.Reader, mf *missingFileChunk) error { - switch compression { - case fileTypeZstdChunked: +func (c *chunkedDiffer) prepareCompressedStreamToFile(partCompression compressedFileType, from io.Reader, mf *missingFileChunk) (compressedFileType, error) { + switch { + case partCompression == fileTypeHole: + // The entire part is a hole. Do not need to read from a file. + c.rawReader = nil + return fileTypeHole, nil + case mf.Hole: + // Only the missing chunk in the requested part refers to a hole. + // The received data must be discarded. + limitReader := io.LimitReader(from, mf.CompressedSize) + _, err := io.CopyBuffer(ioutil.Discard, limitReader, c.copyBuffer) + return fileTypeHole, err + case partCompression == fileTypeZstdChunked: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.zstdReader == nil { r := zstd.NewReader(c.rawReader) @@ -732,42 +745,83 @@ func (c *chunkedDiffer) prepareCompressedStreamToFile(compression compressedFile } else { c.zstdReader.Reset(c.rawReader, nil) } - case fileTypeEstargz: + case partCompression == fileTypeEstargz: c.rawReader = io.LimitReader(from, mf.CompressedSize) if c.gzipReader == nil { r, err := pgzip.NewReader(c.rawReader) if err != nil { - return err + return partCompression, err } c.gzipReader = r } else { if err := c.gzipReader.Reset(c.rawReader); err != nil { - return err + return partCompression, err } } - - case fileTypeNoCompression: + case partCompression == fileTypeNoCompression: c.rawReader = io.LimitReader(from, mf.UncompressedSize) default: - return fmt.Errorf("unknown file type %q", c.fileType) + return partCompression, fmt.Errorf("unknown file type %q", c.fileType) + } + return partCompression, nil +} + +// hashHole writes SIZE zeros to the specified hasher +func hashHole(h hash.Hash, size int64, copyBuffer []byte) error { + count := int64(len(copyBuffer)) + if size < count { + count = size + } + for i := int64(0); i < count; i++ { + copyBuffer[i] = 0 + } + for size > 0 { + count = int64(len(copyBuffer)) + if size < count { + count = size + } + if _, err := h.Write(copyBuffer[:count]); err != nil { + return err + } + size -= count + } + return nil +} + +// appendHole creates a hole with the specified size at the open fd. +func appendHole(fd int, size int64) error { + off, err := unix.Seek(fd, size, unix.SEEK_CUR) + if err != nil { + return err + } + // Make sure the file size is changed. It might be the last hole and no other data written afterwards. + if err := unix.Ftruncate(fd, off); err != nil { + return err } return nil } -func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, to io.Writer, size int64) error { +func (c *chunkedDiffer) appendCompressedStreamToFile(compression compressedFileType, destFile *destinationFile, size int64) error { switch compression { case fileTypeZstdChunked: defer c.zstdReader.Reset(nil, nil) - if _, err := io.CopyBuffer(to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.zstdReader, size), c.copyBuffer); err != nil { return err } case fileTypeEstargz: defer c.gzipReader.Close() - if _, err := io.CopyBuffer(to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.gzipReader, size), c.copyBuffer); err != nil { return err } case fileTypeNoCompression: - if _, err := io.CopyBuffer(to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + if _, err := io.CopyBuffer(destFile.to, io.LimitReader(c.rawReader, size), c.copyBuffer); err != nil { + return err + } + case fileTypeHole: + if err := appendHole(int(destFile.file.Fd()), size); err != nil { + return err + } + if err := hashHole(destFile.hash, size, c.copyBuffer); err != nil { return err } default: @@ -780,6 +834,7 @@ type destinationFile struct { dirfd int file *os.File digester digest.Digester + hash hash.Hash to io.Writer metadata *internal.FileMetadata options *archive.TarOptions @@ -792,11 +847,13 @@ func openDestinationFile(dirfd int, metadata *internal.FileMetadata, options *ar } digester := digest.Canonical.Digester() - to := io.MultiWriter(file, digester.Hash()) + hash := digester.Hash() + to := io.MultiWriter(file, hash) return &destinationFile{ file: file, digester: digester, + hash: hash, to: to, metadata: metadata, options: options, @@ -841,15 +898,17 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan for _, missingPart := range missingParts { var part io.ReadCloser - compression := c.fileType + partCompression := c.fileType switch { + case missingPart.Hole: + partCompression = fileTypeHole case missingPart.OriginFile != nil: var err error part, err = missingPart.OriginFile.OpenFile() if err != nil { return err } - compression = fileTypeNoCompression + partCompression = fileTypeNoCompression case missingPart.SourceChunk != nil: select { case p := <-streams: @@ -880,7 +939,8 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan goto exit } - if err := c.prepareCompressedStreamToFile(compression, part, &mf); err != nil { + compression, err := c.prepareCompressedStreamToFile(partCompression, part, &mf) + if err != nil { Err = err goto exit } @@ -911,19 +971,23 @@ func (c *chunkedDiffer) storeMissingFiles(streams chan io.ReadCloser, errs chan } } - if err := c.appendCompressedStreamToFile(compression, destFile.to, mf.UncompressedSize); err != nil { + if err := c.appendCompressedStreamToFile(compression, destFile, mf.UncompressedSize); err != nil { Err = err goto exit } - if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { - Err = err - goto exit + if c.rawReader != nil { + if _, err := io.CopyBuffer(ioutil.Discard, c.rawReader, c.copyBuffer); err != nil { + Err = err + goto exit + } } } exit: - part.Close() - if Err != nil { - break + if part != nil { + part.Close() + if Err != nil { + break + } } } @@ -957,6 +1021,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) if gap == 0 && missingParts[prevIndex].OriginFile == nil && missingParts[i].OriginFile == nil && + !missingParts[prevIndex].Hole && !missingParts[i].Hole && len(missingParts[prevIndex].Chunks) == 1 && len(missingParts[i].Chunks) == 1 && missingParts[prevIndex].Chunks[0].File.Name == missingParts[i].Chunks[0].File.Name { missingParts[prevIndex].SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length @@ -983,6 +1048,9 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { sort.Ints(costs) toShrink := len(missingParts) - target + if toShrink >= len(costs) { + toShrink = len(costs) - 1 + } targetValue := costs[toShrink] newMissingParts = missingParts[0:1] @@ -993,6 +1061,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { gap := getGap(missingParts, i) prev := &newMissingParts[len(newMissingParts)-1] prev.SourceChunk.Length += uint64(gap) + missingParts[i].SourceChunk.Length + prev.Hole = false prev.OriginFile = nil if gap > 0 { gapFile := missingFileChunk{ @@ -1009,7 +1078,7 @@ func mergeMissingChunks(missingParts []missingPart, target int) []missingPart { func (c *chunkedDiffer) retrieveMissingFiles(dest string, dirfd int, missingParts []missingPart, options *archive.TarOptions) error { var chunksToRequest []ImageSourceChunk for _, c := range missingParts { - if c.OriginFile == nil { + if c.OriginFile == nil && !c.Hole { chunksToRequest = append(chunksToRequest, *c.SourceChunk) } } @@ -1542,16 +1611,26 @@ func (c *chunkedDiffer) ApplyDiff(dest string, options *archive.TarOptions) (gra }, } - root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) - if err != nil { - return output, err - } - if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + switch chunk.ChunkType { + case internal.ChunkTypeData: + root, path, offset, err := c.layersCache.findChunkInOtherLayers(chunk) + if err != nil { + return output, err + } + if offset >= 0 && validateChunkChecksum(chunk, root, path, offset, c.copyBuffer) { + missingPartsSize -= size + mp.OriginFile = &originFile{ + Root: root, + Path: path, + Offset: offset, + } + } + case internal.ChunkTypeZeros: missingPartsSize -= size - mp.OriginFile = &originFile{ - Root: root, - Path: path, - Offset: offset, + mp.Hole = true + // Mark all chunks belonging to the missing part as holes + for i := range mp.Chunks { + mp.Chunks[i].Hole = true } } missingParts = append(missingParts, mp)