diff --git a/pkg/content/decompressstore.go b/pkg/content/decompressstore.go index cde65423a..42d832a4b 100644 --- a/pkg/content/decompressstore.go +++ b/pkg/content/decompressstore.go @@ -2,12 +2,13 @@ package content import ( "context" + "errors" "strings" ctrcontent "github.com/containerd/containerd/content" ) -// DecompressWriter store to decompress content and extract from tar, if needed, wrapping +// DecompressStore store to decompress content and extract from tar, if needed, wrapping // another store. By default, a FileStore will simply take each artifact and write it to // a file, as a MemoryStore will do into memory. If the artifact is gzipped or tarred, // you might want to store the actual object inside tar or gzip. Wrap your Store @@ -17,15 +18,34 @@ import ( // For example: // // fileStore := NewFileStore(rootPath) -// decompressStore := store.NewDecompressStore(fileStore, blocksize) +// decompressStore := store.NewDecompressStore(fileStore, WithBlocksize(blocksize)) +// +// The above example works if there is no tar, i.e. each artifact is just a single file, perhaps gzipped, +// or if there is only one file in each tar archive. In other words, when each content.Writer has only one target output stream. +// However, if you have multiple files in each tar archive, each archive of which is an artifact layer, then +// you need a way to select how to handle each file in the tar archive. In other words, when each content.Writer has more than one +// target output stream. In that case, use the following example: +// +// multiStore := NewMultiStore(rootPath) // some store that can handle different filenames +// decompressStore := store.NewDecompressStore(multiStore, WithBlocksize(blocksize), WithMultiWriterIngester()) // type DecompressStore struct { - ingester ctrcontent.Ingester - blocksize int + ingester ctrcontent.Ingester + blocksize int + multiWriterIngester bool } -func NewDecompressStore(ingester ctrcontent.Ingester, blocksize int) DecompressStore { - return DecompressStore{ingester, blocksize} +func NewDecompressStore(ingester ctrcontent.Ingester, opts ...WriterOpt) DecompressStore { + // we have to reprocess the opts to find the blocksize + var wOpts WriterOpts + for _, opt := range opts { + if err := opt(&wOpts); err != nil { + // TODO: we probably should handle errors here + continue + } + } + + return DecompressStore{ingester, wOpts.Blocksize, wOpts.MultiWriterIngester} } // Writer get a writer @@ -34,10 +54,20 @@ func (d DecompressStore) Writer(ctx context.Context, opts ...ctrcontent.WriterOp // - if there is a desc in the opts, and the mediatype is tar or tar+gzip, then pass the correct decompress writer // - else, pass the regular writer var ( - writer ctrcontent.Writer - err error + writer ctrcontent.Writer + err error + multiIngester MultiWriterIngester + ok bool ) + // check to see if we are supposed to use a MultiWriterIngester + if d.multiWriterIngester { + multiIngester, ok = d.ingester.(MultiWriterIngester) + if !ok { + return nil, errors.New("configured to use multiwriter ingester, but ingester does not implement multiwriter") + } + } + // we have to reprocess the opts to find the desc var wOpts ctrcontent.WriterOpts for _, opt := range opts { @@ -52,20 +82,37 @@ func (d DecompressStore) Writer(ctx context.Context, opts ...ctrcontent.WriterOp hasGzip, hasTar, modifiedMediaType := checkCompression(desc.MediaType) wOpts.Desc.MediaType = modifiedMediaType opts = append(opts, ctrcontent.WithDescriptor(wOpts.Desc)) - writer, err = d.ingester.Writer(ctx, opts...) - if err != nil { - return nil, err - } // determine if we pass it blocksize, only if positive writerOpts := []WriterOpt{} if d.blocksize > 0 { writerOpts = append(writerOpts, WithBlocksize(d.blocksize)) } - // figure out which writer we need + + writer, err = d.ingester.Writer(ctx, opts...) + if err != nil { + return nil, err + } + + // do we need to wrap with an untar writer? if hasTar { - writer = NewUntarWriter(writer, writerOpts...) + // if not multiingester, get a regular writer + if multiIngester == nil { + writer = NewUntarWriter(writer, writerOpts...) + } else { + writers, err := multiIngester.Writers(ctx, opts...) + if err != nil { + return nil, err + } + writer = NewUntarWriterByName(writers, writerOpts...) + } } if hasGzip { + if writer == nil { + writer, err = d.ingester.Writer(ctx, opts...) + if err != nil { + return nil, err + } + } writer = NewGunzipWriter(writer, writerOpts...) } return writer, nil diff --git a/pkg/content/decompressstore_test.go b/pkg/content/decompressstore_test.go index 5c9593b0a..0e12a85fb 100644 --- a/pkg/content/decompressstore_test.go +++ b/pkg/content/decompressstore_test.go @@ -32,7 +32,7 @@ func TestDecompressStore(t *testing.T) { } memStore := content.NewMemoryStore() - decompressStore := content.NewDecompressStore(memStore, 0) + decompressStore := content.NewDecompressStore(memStore, content.WithBlocksize(0)) ctx := context.Background() decompressWriter, err := decompressStore.Writer(ctx, ctrcontent.WithDescriptor(gzipDescriptor)) if err != nil { diff --git a/pkg/content/multiwriter.go b/pkg/content/multiwriter.go new file mode 100644 index 000000000..87df2d614 --- /dev/null +++ b/pkg/content/multiwriter.go @@ -0,0 +1,16 @@ +package content + +import ( + "context" + + ctrcontent "github.com/containerd/containerd/content" +) + +// MultiWriterIngester an ingester that can provide a single writer or multiple writers for a single +// descriptor. Useful when the target of a descriptor can have multiple items within it, e.g. a layer +// that is a tar file with multiple files, each of which should go to a different stream, some of which +// should not be handled at all +type MultiWriterIngester interface { + ctrcontent.Ingester + Writers(ctx context.Context, opts ...ctrcontent.WriterOpt) (map[string]ctrcontent.Writer, error) +} diff --git a/pkg/content/opts.go b/pkg/content/opts.go index 007e6a009..56d496959 100644 --- a/pkg/content/opts.go +++ b/pkg/content/opts.go @@ -7,9 +7,10 @@ import ( ) type WriterOpts struct { - InputHash *digest.Digest - OutputHash *digest.Digest - Blocksize int + InputHash *digest.Digest + OutputHash *digest.Digest + Blocksize int + MultiWriterIngester bool } type WriterOpt func(*WriterOpts) error @@ -60,3 +61,13 @@ func WithBlocksize(blocksize int) WriterOpt { return nil } } + +// WithMultiWriterIngester the passed ingester also implements MultiWriter +// and should be used as such. If this is set to true, but the ingester does not +// implement MultiWriter, calling Writer should return an error. +func WithMultiWriterIngester() WriterOpt { + return func(w *WriterOpts) error { + w.MultiWriterIngester = true + return nil + } +} diff --git a/pkg/content/passthrough.go b/pkg/content/passthrough.go index 6126fc15f..b9a891d53 100644 --- a/pkg/content/passthrough.go +++ b/pkg/content/passthrough.go @@ -2,7 +2,9 @@ package content import ( "context" + "errors" "io" + "time" "github.com/containerd/containerd/content" "github.com/opencontainers/go-digest" @@ -137,3 +139,124 @@ func (u *underlyingWriter) Digest() digest.Digest { } return u.digester.Digest() } + +// PassthroughMultiWriter single writer that passes through to multiple writers, allowing the passthrough +// function to select which writer to use. +type PassthroughMultiWriter struct { + writers []*PassthroughWriter + pipew *io.PipeWriter + digester digest.Digester + size int64 + reader *io.PipeReader + hash *digest.Digest + done chan error + startedAt time.Time + updatedAt time.Time + ref string +} + +func NewPassthroughMultiWriter(writers []content.Writer, f func(r io.Reader, w []io.Writer, done chan<- error), opts ...WriterOpt) content.Writer { + // process opts for default + wOpts := DefaultWriterOpts() + for _, opt := range opts { + if err := opt(&wOpts); err != nil { + return nil + } + } + + var pws []*PassthroughWriter + r, w := io.Pipe() + for _, writer := range writers { + pws = append(pws, &PassthroughWriter{ + writer: writer, + pipew: w, + digester: digest.Canonical.Digester(), + underlyingWriter: &underlyingWriter{ + writer: writer, + digester: digest.Canonical.Digester(), + hash: wOpts.OutputHash, + }, + reader: r, + hash: wOpts.InputHash, + done: make(chan error, 1), + }) + } + + pmw := &PassthroughMultiWriter{ + writers: pws, + startedAt: time.Now(), + updatedAt: time.Now(), + done: make(chan error, 1), + } + // get our output writers + var uws []io.Writer + for _, uw := range pws { + uws = append(uws, uw.underlyingWriter) + } + go f(r, uws, pmw.done) + return pmw +} + +func (pmw *PassthroughMultiWriter) Write(p []byte) (n int, err error) { + n, err = pmw.pipew.Write(p) + if pmw.hash == nil { + pmw.digester.Hash().Write(p[:n]) + } + pmw.size += int64(n) + pmw.updatedAt = time.Now() + return +} + +func (pmw *PassthroughMultiWriter) Close() error { + pmw.pipew.Close() + for _, w := range pmw.writers { + w.Close() + } + return nil +} + +// Digest may return empty digest or panics until committed. +func (pmw *PassthroughMultiWriter) Digest() digest.Digest { + if pmw.hash != nil { + return *pmw.hash + } + return pmw.digester.Digest() +} + +// Commit commits the blob (but no roll-back is guaranteed on an error). +// size and expected can be zero-value when unknown. +// Commit always closes the writer, even on error. +// ErrAlreadyExists aborts the writer. +func (pmw *PassthroughMultiWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + pmw.pipew.Close() + err := <-pmw.done + pmw.reader.Close() + if err != nil && err != io.EOF { + return err + } + + // Some underlying writers will validate an expected digest, so we need the option to pass it + // that digest. That is why we caluclate the digest of the underlying writer throughout the write process. + for _, w := range pmw.writers { + // maybe this should be Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...) + w.done <- err + if err := w.Commit(ctx, size, expected, opts...); err != nil { + return err + } + } + return nil +} + +// Status returns the current state of write +func (pmw *PassthroughMultiWriter) Status() (content.Status, error) { + return content.Status{ + StartedAt: pmw.startedAt, + UpdatedAt: pmw.updatedAt, + Total: pmw.size, + }, nil +} + +// Truncate updates the size of the target blob, but cannot do anything with a multiwriter +func (pmw *PassthroughMultiWriter) Truncate(size int64) error { + return errors.New("truncate unavailable on multiwriter") +} diff --git a/pkg/content/untar.go b/pkg/content/untar.go index 7e6d213e3..729a318af 100644 --- a/pkg/content/untar.go +++ b/pkg/content/untar.go @@ -70,3 +70,83 @@ func NewUntarWriter(writer content.Writer, opts ...WriterOpt) content.Writer { done <- err }, opts...) } + +// NewUntarWriterByName wrap multiple writers with an untar, so that the stream is untarred and passed +// to the appropriate writer, based on the filename. If a filename is not found, it will not pass it +// to any writer. The filename "" will handle any stream that does not have a specific filename; use +// it for the default of a single file in a tar stream. +func NewUntarWriterByName(writers map[string]content.Writer, opts ...WriterOpt) content.Writer { + // process opts for default + wOpts := DefaultWriterOpts() + for _, opt := range opts { + if err := opt(&wOpts); err != nil { + return nil + } + } + + // construct an array of content.Writer + nameToIndex := map[string]int{} + var writerSlice []content.Writer + for name, writer := range writers { + writerSlice = append(writerSlice, writer) + nameToIndex[name] = len(writerSlice) - 1 + } + // need a PassthroughMultiWriter here + return NewPassthroughMultiWriter(writerSlice, func(r io.Reader, ws []io.Writer, done chan<- error) { + tr := tar.NewReader(r) + var err error + for { + header, err := tr.Next() + if err == io.EOF { + // clear the error, since we do not pass an io.EOF + err = nil + break // End of archive + } + if err != nil { + // pass the error on + err = fmt.Errorf("UntarWriter tar file header read error: %v", err) + break + } + // get the filename + filename := header.Name + index, ok := nameToIndex[filename] + if !ok { + index, ok = nameToIndex[""] + if !ok { + // we did not find this file or the wildcard, so do not process this file + continue + } + } + + // write out the untarred data + // we can handle io.EOF, just go to the next file + // any other errors should stop and get reported + b := make([]byte, wOpts.Blocksize, wOpts.Blocksize) + for { + var n int + n, err = tr.Read(b) + if err != nil && err != io.EOF { + err = fmt.Errorf("UntarWriter file data read error: %v\n", err) + break + } + l := n + if n > len(b) { + l = len(b) + } + if _, err2 := ws[index].Write(b[:l]); err2 != nil { + err = fmt.Errorf("UntarWriter error writing to underlying writer at index %d for name '%s': %v", index, filename, err2) + break + } + if err == io.EOF { + // go to the next file + break + } + } + // did we break with a non-nil and non-EOF error? + if err != nil && err != io.EOF { + break + } + } + done <- err + }, opts...) +}