Skip to content

Commit

Permalink
estargz: support lossless compression
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <[email protected]>
  • Loading branch information
ktock committed Sep 9, 2021
1 parent 1d81483 commit 1d70ca1
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 273 deletions.
2 changes: 1 addition & 1 deletion cmd/ctr-remote/commands/get-toc-digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var GetTOCDigestCommand = cli.Command{
decompressor = new(zstdchunked.Decompressor)
}

tocOff, tocSize, err := decompressor.ParseFooter(footer)
_, tocOff, tocSize, err := decompressor.ParseFooter(footer)
if err != nil {
return errors.Wrapf(err, "error parsing footer")
}
Expand Down
4 changes: 2 additions & 2 deletions estargz/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestSort(t *testing.T) {
if tt.allowMissedFiles != nil {
opts = append(opts, WithAllowPrioritizeNotFound(&missedFiles))
}
rc, err := Build(compressBlob(t, buildTarStatic(t, tt.in, tarprefix), srcCompression),
rc, err := Build(compressBlob(t, buildTar(t, tt.in, tarprefix), srcCompression),
append(opts, WithPrioritizedFiles(pfiles))...)
if tt.wantFail {
if err != nil {
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestSort(t *testing.T) {
gotTar := tar.NewReader(zr)

// Compare all
wantTar := tar.NewReader(buildTarStatic(t, tt.want, tarprefix))
wantTar := tar.NewReader(buildTar(t, tt.want, tarprefix))
for {
// Fetch and parse next header.
gotH, wantH, err := next(t, gotTar, wantTar)
Expand Down
122 changes: 103 additions & 19 deletions estargz/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
package estargz

import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
Expand All @@ -42,6 +41,7 @@ import (
"github.com/containerd/stargz-snapshotter/estargz/errorutil"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/vbatts/tar-split/archive/tar"
)

// A Reader permits random access reads from a stargz file.
Expand Down Expand Up @@ -95,10 +95,10 @@ func WithTelemetry(telemetry *Telemetry) OpenOption {
}
}

// A func which takes start time and records the diff
// MeasureLatencyHook is a func which takes start time and records the diff
type MeasureLatencyHook func(time.Time)

// A struct which defines telemetry hooks. By implementing these hooks you should be able to record
// Telemetry is a struct which defines telemetry hooks. By implementing these hooks you should be able to record
// the latency metrics of the respective steps of estargz open operation. To be used with estargz.OpenWithTelemetry(...)
type Telemetry struct {
GetFooterLatency MeasureLatencyHook // measure time to get stargz footer (in milliseconds)
Expand Down Expand Up @@ -146,7 +146,7 @@ func Open(sr *io.SectionReader, opt ...OpenOption) (*Reader, error) {
fSize := d.FooterSize()
fOffset := positive(int64(len(footer)) - fSize)
maybeTocBytes := footer[:fOffset]
tocOffset, tocSize, err := d.ParseFooter(footer[fOffset:])
_, tocOffset, tocSize, err := d.ParseFooter(footer[fOffset:])
if err != nil {
allErr = append(allErr, err)
continue
Expand Down Expand Up @@ -187,7 +187,7 @@ func OpenFooter(sr *io.SectionReader) (tocOffset int64, footerSize int64, rErr e
for _, d := range []Decompressor{new(GzipDecompressor), new(legacyGzipDecompressor)} {
fSize := d.FooterSize()
fOffset := positive(int64(len(footer)) - fSize)
tocOffset, _, err := d.ParseFooter(footer[fOffset:])
_, tocOffset, _, err := d.ParseFooter(footer[fOffset:])
if err == nil {
return tocOffset, fSize, err
}
Expand Down Expand Up @@ -591,6 +591,11 @@ type currentCompressionWriter struct{ w *Writer }

func (ccw currentCompressionWriter) Write(p []byte) (int, error) {
ccw.w.diffHash.Write(p)
if ccw.w.gz == nil {
if err := ccw.w.condOpenGz(); err != nil {
return 0, err
}
}
return ccw.w.gz.Write(p)
}

Expand All @@ -601,6 +606,25 @@ func (w *Writer) chunkSize() int {
return w.ChunkSize
}

// Unpack decompresses the given estargz blob and returns a ReadCloser of the tar blob.
// TOC JSON and footer are removed.
func Unpack(sr *io.SectionReader, c Decompressor) (io.ReadCloser, error) {
footerSize := c.FooterSize()
if sr.Size() < footerSize {
return nil, fmt.Errorf("blob is too small; %d < %d", sr.Size(), footerSize)
}
footerOffset := sr.Size() - footerSize
footer := make([]byte, footerSize)
if _, err := sr.ReadAt(footer, footerOffset); err != nil {
return nil, err
}
blobPayloadSize, _, _, err := c.ParseFooter(footer)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse footer")
}
return c.Reader(io.LimitReader(sr, blobPayloadSize))
}

// NewWriter returns a new stargz writer (gzip-based) writing to w.
//
// The writer must be closed to write its trailing table of contents.
Expand All @@ -616,7 +640,7 @@ func NewWriterLevel(w io.Writer, compressionLevel int) *Writer {
return NewWriterWithCompressor(w, NewGzipCompressorWithLevel(compressionLevel))
}

// NewWriterLevel returns a new stargz writer writing to w.
// NewWriterWithCompressor returns a new stargz writer writing to w.
// The compression method is configurable.
//
// The writer must be closed to write its trailing table of contents.
Expand Down Expand Up @@ -696,29 +720,71 @@ func (w *Writer) condOpenGz() (err error) {
// each of its contents to w.
//
// The input r can optionally be gzip compressed but the output will
// always be gzip compressed.
// always be compressed by the specified compressor.
func (w *Writer) AppendTar(r io.Reader) error {
return w.appendTar(r, false)
}

// AppendTarLossLess reads the tar or tar.gz file from r and appends
// each of its contents to w.
//
// The input r can optionally be gzip compressed but the output will
// always be compressed by the specified compressor.
//
// The difference of this func with AppendTar is that this writes
// the input tar stream into w without any modification (e.g. to header bytes).
//
// Note that if the input tar stream already contains TOC JSON, this returns
// error because w cannot overwrite the TOC JSON to the one generated by w without
// lossy modification. To avoid this error, if the input stream is known to be stargz/estargz,
// you shoud decompress it and remove TOC JSON in advance.
func (w *Writer) AppendTarLossLess(r io.Reader) error {
return w.appendTar(r, true)
}

func (w *Writer) appendTar(r io.Reader, lossless bool) error {
var src io.Reader
br := bufio.NewReader(r)
var tr *tar.Reader
if isGzip(br) {
// NewReader can't fail if isGzip returned true.
zr, _ := gzip.NewReader(br)
tr = tar.NewReader(zr)
src = zr
} else {
tr = tar.NewReader(br)
src = io.Reader(br)
}
dst := currentCompressionWriter{w}
var tw *tar.Writer
if !lossless {
tw = tar.NewWriter(dst) // use tar writer only when this isn't lossless mode.
}
tr := tar.NewReader(src)
if lossless {
tr.RawAccounting = true
}
for {
h, err := tr.Next()
if err == io.EOF {
if lossless {
if remain := tr.RawBytes(); len(remain) > 0 {
// Collect the remaining null bytes.
// https://github.com/vbatts/tar-split/blob/80a436fd6164c557b131f7c59ed69bd81af69761/concept/main.go#L49-L53
if _, err := dst.Write(remain); err != nil {
return err
}
}
}
break
}
if err != nil {
return fmt.Errorf("error reading from source tar: tar.Reader.Next: %v", err)
}
if h.Name == TOCTarName {
if cleanEntryName(h.Name) == TOCTarName {
// It is possible for a layer to be "stargzified" twice during the
// distribution lifecycle. So we reserve "TOCTarName" here to avoid
// duplicated entries in the resulting layer.
if lossless {
// We cannot handle this in lossless way.
return fmt.Errorf("existing TOC JSON is not allowed; decompress layer before append")
}
continue
}

Expand All @@ -744,9 +810,14 @@ func (w *Writer) AppendTar(r io.Reader) error {
if err := w.condOpenGz(); err != nil {
return err
}
tw := tar.NewWriter(currentCompressionWriter{w})
if err := tw.WriteHeader(h); err != nil {
return err
if tw != nil {
if err := tw.WriteHeader(h); err != nil {
return err
}
} else {
if _, err := dst.Write(tr.RawBytes()); err != nil {
return err
}
}
switch h.Typeflag {
case tar.TypeLink:
Expand Down Expand Up @@ -808,7 +879,13 @@ func (w *Writer) AppendTar(r io.Reader) error {
}

teeChunk := io.TeeReader(tee, chunkDigest.Hash())
if _, err := io.CopyN(tw, teeChunk, chunkSize); err != nil {
var out io.Writer
if tw != nil {
out = tw
} else {
out = dst
}
if _, err := io.CopyN(out, teeChunk, chunkSize); err != nil {
return fmt.Errorf("error copying %q: %v", h.Name, err)
}
ent.ChunkDigest = chunkDigest.Digest().String()
Expand All @@ -825,11 +902,18 @@ func (w *Writer) AppendTar(r io.Reader) error {
if payloadDigest != nil {
regFileEntry.Digest = payloadDigest.Digest().String()
}
if err := tw.Flush(); err != nil {
return err
if tw != nil {
if err := tw.Flush(); err != nil {
return err
}
}
}
return nil
remainDest := ioutil.Discard
if lossless {
remainDest = dst // Preserve the remaining bytes in lossless mode
}
_, err := io.Copy(remainDest, src)
return err
}

// DiffID returns the SHA-256 of the uncompressed tar bytes.
Expand Down
1 change: 1 addition & 0 deletions estargz/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ require (
github.com/klauspost/compress v1.13.5
github.com/opencontainers/go-digest v1.0.0
github.com/pkg/errors v0.9.1
github.com/vbatts/tar-split v0.11.2
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)
14 changes: 14 additions & 0 deletions estargz/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.13.5 h1:9O69jUPDcsT9fEm74W92rZL9FQY7rCdaXVneq+yyzl4=
github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vbatts/tar-split v0.11.2 h1:Via6XqJr0hceW4wff3QRzD5gAk/tatMw/4ZA7cTlIME=
github.com/vbatts/tar-split v0.11.2/go.mod h1:vV3ZuO2yWSVsz+pfFzDG/upWH1JhjOiEaWq6kXyQ3VI=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
30 changes: 15 additions & 15 deletions estargz/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,31 +124,31 @@ func (gz *GzipDecompressor) ParseTOC(r io.Reader) (toc *JTOC, tocDgst digest.Dig
return parseTOCEStargz(r)
}

func (gz *GzipDecompressor) ParseFooter(p []byte) (tocOffset, tocSize int64, err error) {
func (gz *GzipDecompressor) ParseFooter(p []byte) (blobPayloadSize, tocOffset, tocSize int64, err error) {
if len(p) != FooterSize {
return 0, 0, fmt.Errorf("invalid length %d cannot be parsed", len(p))
return 0, 0, 0, fmt.Errorf("invalid length %d cannot be parsed", len(p))
}
zr, err := gzip.NewReader(bytes.NewReader(p))
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}
defer zr.Close()
extra := zr.Header.Extra
si1, si2, subfieldlen, subfield := extra[0], extra[1], extra[2:4], extra[4:]
if si1 != 'S' || si2 != 'G' {
return 0, 0, fmt.Errorf("invalid subfield IDs: %q, %q; want E, S", si1, si2)
return 0, 0, 0, fmt.Errorf("invalid subfield IDs: %q, %q; want E, S", si1, si2)
}
if slen := binary.LittleEndian.Uint16(subfieldlen); slen != uint16(16+len("STARGZ")) {
return 0, 0, fmt.Errorf("invalid length of subfield %d; want %d", slen, 16+len("STARGZ"))
return 0, 0, 0, fmt.Errorf("invalid length of subfield %d; want %d", slen, 16+len("STARGZ"))
}
if string(subfield[16:]) != "STARGZ" {
return 0, 0, fmt.Errorf("STARGZ magic string must be included in the footer subfield")
return 0, 0, 0, fmt.Errorf("STARGZ magic string must be included in the footer subfield")
}
tocOffset, err = strconv.ParseInt(string(subfield[:16]), 16, 64)
if err != nil {
return 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
return 0, 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
}
return tocOffset, 0, nil
return tocOffset, tocOffset, 0, nil
}

func (gz *GzipDecompressor) FooterSize() int64 {
Expand All @@ -165,27 +165,27 @@ func (gz *legacyGzipDecompressor) ParseTOC(r io.Reader) (toc *JTOC, tocDgst dige
return parseTOCEStargz(r)
}

func (gz *legacyGzipDecompressor) ParseFooter(p []byte) (tocOffset, tocSize int64, err error) {
func (gz *legacyGzipDecompressor) ParseFooter(p []byte) (blobPayloadSize, tocOffset, tocSize int64, err error) {
if len(p) != legacyFooterSize {
return 0, 0, fmt.Errorf("legacy: invalid length %d cannot be parsed", len(p))
return 0, 0, 0, fmt.Errorf("legacy: invalid length %d cannot be parsed", len(p))
}
zr, err := gzip.NewReader(bytes.NewReader(p))
if err != nil {
return 0, 0, errors.Wrapf(err, "legacy: failed to get footer gzip reader")
return 0, 0, 0, errors.Wrapf(err, "legacy: failed to get footer gzip reader")
}
defer zr.Close()
extra := zr.Header.Extra
if len(extra) != 16+len("STARGZ") {
return 0, 0, fmt.Errorf("legacy: invalid stargz's extra field size")
return 0, 0, 0, fmt.Errorf("legacy: invalid stargz's extra field size")
}
if string(extra[16:]) != "STARGZ" {
return 0, 0, fmt.Errorf("legacy: magic string STARGZ not found")
return 0, 0, 0, fmt.Errorf("legacy: magic string STARGZ not found")
}
tocOffset, err = strconv.ParseInt(string(extra[:16]), 16, 64)
if err != nil {
return 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
return 0, 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
}
return tocOffset, 0, nil
return tocOffset, tocOffset, 0, nil
}

func (gz *legacyGzipDecompressor) FooterSize() int64 {
Expand Down
4 changes: 2 additions & 2 deletions estargz/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func checkFooter(t *testing.T, off int64) {
if len(footer) != FooterSize {
t.Fatalf("for offset %v, footer length was %d, not expected %d. got bytes: %q", off, len(footer), FooterSize, footer)
}
got, _, err := (&GzipDecompressor{}).ParseFooter(footer)
_, got, _, err := (&GzipDecompressor{}).ParseFooter(footer)
if err != nil {
t.Fatalf("failed to parse footer for offset %d, footer: %x: err: %v",
off, footer, err)
Expand All @@ -125,7 +125,7 @@ func checkLegacyFooter(t *testing.T, off int64) {
if len(footer) != legacyFooterSize {
t.Fatalf("for offset %v, footer length was %d, not expected %d. got bytes: %q", off, len(footer), legacyFooterSize, footer)
}
got, _, err := (&legacyGzipDecompressor{}).ParseFooter(footer)
_, got, _, err := (&legacyGzipDecompressor{}).ParseFooter(footer)
if err != nil {
t.Fatalf("failed to parse legacy footer for offset %d, footer: %x: err: %v",
off, footer, err)
Expand Down
Loading

0 comments on commit 1d70ca1

Please sign in to comment.