Skip to content

Commit

Permalink
multiwriteringester support
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Deitcher <[email protected]>
  • Loading branch information
deitch committed Jan 28, 2021
1 parent 96cd904 commit 402a5c3
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 18 deletions.
75 changes: 61 additions & 14 deletions pkg/content/decompressstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package content

import (
"context"
"errors"
"strings"

ctrcontent "github.com/containerd/containerd/content"
)

// DecompressWriter store to decompress content and extract from tar, if needed, wrapping
// DecompressStore store to decompress content and extract from tar, if needed, wrapping
// another store. By default, a FileStore will simply take each artifact and write it to
// a file, as a MemoryStore will do into memory. If the artifact is gzipped or tarred,
// you might want to store the actual object inside tar or gzip. Wrap your Store
Expand All @@ -17,15 +18,34 @@ import (
// For example:
//
// fileStore := NewFileStore(rootPath)
// decompressStore := store.NewDecompressStore(fileStore, blocksize)
// decompressStore := store.NewDecompressStore(fileStore, WithBlocksize(blocksize))
//
// The above example works if there is no tar, i.e. each artifact is just a single file, perhaps gzipped,
// or if there is only one file in each tar archive. In other words, when each content.Writer has only one target output stream.
// However, if you have multiple files in each tar archive, each archive of which is an artifact layer, then
// you need a way to select how to handle each file in the tar archive. In other words, when each content.Writer has more than one
// target output stream. In that case, use the following example:
//
// fileStore := NewFileStore(rootPath)
// decompressStore := store.NewDecompressStore(fileStore, WithBlocksize(blocksize))
//
type DecompressStore struct {
ingester ctrcontent.Ingester
blocksize int
ingester ctrcontent.Ingester
blocksize int
multiWriterIngester bool
}

func NewDecompressStore(ingester ctrcontent.Ingester, blocksize int) DecompressStore {
return DecompressStore{ingester, blocksize}
func NewDecompressStore(ingester ctrcontent.Ingester, opts ...WriterOpt) DecompressStore {
// we have to reprocess the opts to find the blocksize
var wOpts WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
// TODO: we probably should handle errors here
continue
}
}

return DecompressStore{ingester, wOpts.Blocksize, wOpts.MultiWriterIngester}
}

// Writer get a writer
Expand All @@ -34,10 +54,20 @@ func (d DecompressStore) Writer(ctx context.Context, opts ...ctrcontent.WriterOp
// - if there is a desc in the opts, and the mediatype is tar or tar+gzip, then pass the correct decompress writer
// - else, pass the regular writer
var (
writer ctrcontent.Writer
err error
writer ctrcontent.Writer
err error
multiIngester MultiWriterIngester
ok bool
)

// check to see if we are supposed to use a MultiWriterIngester
if d.multiWriterIngester {
multiIngester, ok = d.ingester.(MultiWriterIngester)
if !ok {
return nil, errors.New("configured to use multiwriter ingester, but ingester does not implement multiwriter")
}
}

// we have to reprocess the opts to find the desc
var wOpts ctrcontent.WriterOpts
for _, opt := range opts {
Expand All @@ -52,20 +82,37 @@ func (d DecompressStore) Writer(ctx context.Context, opts ...ctrcontent.WriterOp
hasGzip, hasTar, modifiedMediaType := checkCompression(desc.MediaType)
wOpts.Desc.MediaType = modifiedMediaType
opts = append(opts, ctrcontent.WithDescriptor(wOpts.Desc))
writer, err = d.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}
// determine if we pass it blocksize, only if positive
writerOpts := []WriterOpt{}
if d.blocksize > 0 {
writerOpts = append(writerOpts, WithBlocksize(d.blocksize))
}
// figure out which writer we need

writer, err = d.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}

// do we need to wrap with an untar writer?
if hasTar {
writer = NewUntarWriter(writer, writerOpts...)
// if not multiingester, get a regular writer
if multiIngester == nil {
writer = NewUntarWriter(writer, writerOpts...)
} else {
writers, err := multiIngester.Writers(ctx, opts...)
if err != nil {
return nil, err
}
writer = NewUntarWriterByName(writers, writerOpts...)
}
}
if hasGzip {
if writer == nil {
writer, err = d.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}
}
writer = NewGunzipWriter(writer, writerOpts...)
}
return writer, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/content/decompressstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestDecompressStore(t *testing.T) {
}

memStore := content.NewMemoryStore()
decompressStore := content.NewDecompressStore(memStore, 0)
decompressStore := content.NewDecompressStore(memStore, content.WithBlocksize(0))
ctx := context.Background()
decompressWriter, err := decompressStore.Writer(ctx, ctrcontent.WithDescriptor(gzipDescriptor))
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/content/multiwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package content

import (
"context"

ctrcontent "github.com/containerd/containerd/content"
)

// MultiWriterIngester an ingester that can provide a single writer or multiple writers for a single
// descriptor. Useful when the target of a descriptor can have multiple items within it, e.g. a layer
// that is a tar file with multiple files, each of which should go to a different stream, some of which
// should not be handled at all
type MultiWriterIngester interface {
ctrcontent.Ingester
Writers(ctx context.Context, opts ...ctrcontent.WriterOpt) (map[string]ctrcontent.Writer, error)
}
17 changes: 14 additions & 3 deletions pkg/content/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type WriterOpts struct {
InputHash *digest.Digest
OutputHash *digest.Digest
Blocksize int
InputHash *digest.Digest
OutputHash *digest.Digest
Blocksize int
MultiWriterIngester bool
}

type WriterOpt func(*WriterOpts) error
Expand Down Expand Up @@ -60,3 +61,13 @@ func WithBlocksize(blocksize int) WriterOpt {
return nil
}
}

// WithMultiWriterIngester the passed ingester also implements MultiWriter
// and should be used as such. If this is set to true, but the ingester does not
// implement MultiWriter, calling Writer should return an error.
func WithMultiWriterIngester() WriterOpt {
return func(w *WriterOpts) error {
w.MultiWriterIngester = true
return nil
}
}
123 changes: 123 additions & 0 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package content

import (
"context"
"errors"
"io"
"time"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -137,3 +139,124 @@ func (u *underlyingWriter) Digest() digest.Digest {
}
return u.digester.Digest()
}

// PassthroughMultiWriter single writer that passes through to multiple writers, allowing the passthrough
// function to select which writer to use.
type PassthroughMultiWriter struct {
writers []*PassthroughWriter
pipew *io.PipeWriter
digester digest.Digester
size int64
reader *io.PipeReader
hash *digest.Digest
done chan error
startedAt time.Time
updatedAt time.Time
ref string
}

func NewPassthroughMultiWriter(writers []content.Writer, f func(r io.Reader, w []io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}

var pws []*PassthroughWriter
r, w := io.Pipe()
for _, writer := range writers {
pws = append(pws, &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingWriter: &underlyingWriter{
writer: writer,
digester: digest.Canonical.Digester(),
hash: wOpts.OutputHash,
},
reader: r,
hash: wOpts.InputHash,
done: make(chan error, 1),
})
}

pmw := &PassthroughMultiWriter{
writers: pws,
startedAt: time.Now(),
updatedAt: time.Now(),
done: make(chan error, 1),
}
// get our output writers
var uws []io.Writer
for _, uw := range pws {
uws = append(uws, uw.underlyingWriter)
}
go f(r, uws, pmw.done)
return pmw
}

func (pmw *PassthroughMultiWriter) Write(p []byte) (n int, err error) {
n, err = pmw.pipew.Write(p)
if pmw.hash == nil {
pmw.digester.Hash().Write(p[:n])
}
pmw.size += int64(n)
pmw.updatedAt = time.Now()
return
}

func (pmw *PassthroughMultiWriter) Close() error {
pmw.pipew.Close()
for _, w := range pmw.writers {
w.Close()
}
return nil
}

// Digest may return empty digest or panics until committed.
func (pmw *PassthroughMultiWriter) Digest() digest.Digest {
if pmw.hash != nil {
return *pmw.hash
}
return pmw.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (pmw *PassthroughMultiWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
pmw.pipew.Close()
err := <-pmw.done
pmw.reader.Close()
if err != nil && err != io.EOF {
return err
}

// Some underlying writers will validate an expected digest, so we need the option to pass it
// that digest. That is why we caluclate the digest of the underlying writer throughout the write process.
for _, w := range pmw.writers {
// maybe this should be Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
w.done <- err
if err := w.Commit(ctx, size, expected, opts...); err != nil {
return err
}
}
return nil
}

// Status returns the current state of write
func (pmw *PassthroughMultiWriter) Status() (content.Status, error) {
return content.Status{
StartedAt: pmw.startedAt,
UpdatedAt: pmw.updatedAt,
Total: pmw.size,
}, nil
}

// Truncate updates the size of the target blob, but cannot do anything with a multiwriter
func (pmw *PassthroughMultiWriter) Truncate(size int64) error {
return errors.New("truncate unavailable on multiwriter")
}
80 changes: 80 additions & 0 deletions pkg/content/untar.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,83 @@ func NewUntarWriter(writer content.Writer, opts ...WriterOpt) content.Writer {
done <- err
}, opts...)
}

// NewUntarWriterByName wrap multiple writers with an untar, so that the stream is untarred and passed
// to the appropriate writer, based on the filename. If a filename is not found, it will not pass it
// to any writer. The filename "" will handle any stream that does not have a specific filename; use
// it for the default of a single file in a tar stream.
func NewUntarWriterByName(writers map[string]content.Writer, opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}

// construct an array of content.Writer
nameToIndex := map[string]int{}
var writerSlice []content.Writer
for name, writer := range writers {
writerSlice = append(writerSlice, writer)
nameToIndex[name] = len(writerSlice) - 1
}
// need a PassthroughMultiWriter here
return NewPassthroughMultiWriter(writerSlice, func(r io.Reader, ws []io.Writer, done chan<- error) {
tr := tar.NewReader(r)
var err error
for {
header, err := tr.Next()
if err == io.EOF {
// clear the error, since we do not pass an io.EOF
err = nil
break // End of archive
}
if err != nil {
// pass the error on
err = fmt.Errorf("UntarWriter tar file header read error: %v", err)
break
}
// get the filename
filename := header.Name
index, ok := nameToIndex[filename]
if !ok {
index, ok = nameToIndex[""]
if !ok {
// we did not find this file or the wildcard, so do not process this file
continue
}
}

// write out the untarred data
// we can handle io.EOF, just go to the next file
// any other errors should stop and get reported
b := make([]byte, wOpts.Blocksize, wOpts.Blocksize)
for {
var n int
n, err = tr.Read(b)
if err != nil && err != io.EOF {
err = fmt.Errorf("UntarWriter file data read error: %v\n", err)
break
}
l := n
if n > len(b) {
l = len(b)
}
if _, err2 := ws[index].Write(b[:l]); err2 != nil {
err = fmt.Errorf("UntarWriter error writing to underlying writer at index %d for name '%s': %v", index, filename, err2)
break
}
if err == io.EOF {
// go to the next file
break
}
}
// did we break with a non-nil and non-EOF error?
if err != nil && err != io.EOF {
break
}
}
done <- err
}, opts...)
}

0 comments on commit 402a5c3

Please sign in to comment.