Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiwriteringester support #214

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 61 additions & 14 deletions pkg/content/decompressstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package content

import (
"context"
"errors"
"strings"

ctrcontent "github.com/containerd/containerd/content"
)

// DecompressWriter store to decompress content and extract from tar, if needed, wrapping
// DecompressStore store to decompress content and extract from tar, if needed, wrapping
// another store. By default, a FileStore will simply take each artifact and write it to
// a file, as a MemoryStore will do into memory. If the artifact is gzipped or tarred,
// you might want to store the actual object inside tar or gzip. Wrap your Store
Expand All @@ -17,15 +18,34 @@ import (
// For example:
//
// fileStore := NewFileStore(rootPath)
// decompressStore := store.NewDecompressStore(fileStore, blocksize)
// decompressStore := store.NewDecompressStore(fileStore, WithBlocksize(blocksize))
//
// The above example works if there is no tar, i.e. each artifact is just a single file, perhaps gzipped,
// or if there is only one file in each tar archive. In other words, when each content.Writer has only one target output stream.
// However, if you have multiple files in each tar archive, each archive of which is an artifact layer, then
// you need a way to select how to handle each file in the tar archive. In other words, when each content.Writer has more than one
// target output stream. In that case, use the following example:
//
// multiStore := NewMultiStore(rootPath) // some store that can handle different filenames
// decompressStore := store.NewDecompressStore(multiStore, WithBlocksize(blocksize), WithMultiWriterIngester())
//
type DecompressStore struct {
ingester ctrcontent.Ingester
blocksize int
ingester ctrcontent.Ingester
blocksize int
multiWriterIngester bool
}

func NewDecompressStore(ingester ctrcontent.Ingester, blocksize int) DecompressStore {
return DecompressStore{ingester, blocksize}
func NewDecompressStore(ingester ctrcontent.Ingester, opts ...WriterOpt) DecompressStore {
// we have to reprocess the opts to find the blocksize
var wOpts WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
// TODO: we probably should handle errors here
continue
}
}

return DecompressStore{ingester, wOpts.Blocksize, wOpts.MultiWriterIngester}
}

// Writer get a writer
Expand All @@ -34,10 +54,20 @@ func (d DecompressStore) Writer(ctx context.Context, opts ...ctrcontent.WriterOp
// - if there is a desc in the opts, and the mediatype is tar or tar+gzip, then pass the correct decompress writer
// - else, pass the regular writer
var (
writer ctrcontent.Writer
err error
writer ctrcontent.Writer
err error
multiIngester MultiWriterIngester
ok bool
)

// check to see if we are supposed to use a MultiWriterIngester
if d.multiWriterIngester {
multiIngester, ok = d.ingester.(MultiWriterIngester)
if !ok {
return nil, errors.New("configured to use multiwriter ingester, but ingester does not implement multiwriter")
}
}

// we have to reprocess the opts to find the desc
var wOpts ctrcontent.WriterOpts
for _, opt := range opts {
Expand All @@ -52,20 +82,37 @@ func (d DecompressStore) Writer(ctx context.Context, opts ...ctrcontent.WriterOp
hasGzip, hasTar, modifiedMediaType := checkCompression(desc.MediaType)
wOpts.Desc.MediaType = modifiedMediaType
opts = append(opts, ctrcontent.WithDescriptor(wOpts.Desc))
writer, err = d.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}
// determine if we pass it blocksize, only if positive
writerOpts := []WriterOpt{}
if d.blocksize > 0 {
writerOpts = append(writerOpts, WithBlocksize(d.blocksize))
}
// figure out which writer we need

writer, err = d.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}

// do we need to wrap with an untar writer?
if hasTar {
writer = NewUntarWriter(writer, writerOpts...)
// if not multiingester, get a regular writer
if multiIngester == nil {
writer = NewUntarWriter(writer, writerOpts...)
} else {
writers, err := multiIngester.Writers(ctx, opts...)
if err != nil {
return nil, err
}
writer = NewUntarWriterByName(writers, writerOpts...)
}
}
if hasGzip {
if writer == nil {
writer, err = d.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}
}
writer = NewGunzipWriter(writer, writerOpts...)
}
return writer, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/content/decompressstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestDecompressStore(t *testing.T) {
}

memStore := content.NewMemoryStore()
decompressStore := content.NewDecompressStore(memStore, 0)
decompressStore := content.NewDecompressStore(memStore, content.WithBlocksize(0))
ctx := context.Background()
decompressWriter, err := decompressStore.Writer(ctx, ctrcontent.WithDescriptor(gzipDescriptor))
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/content/multiwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package content

import (
"context"

ctrcontent "github.com/containerd/containerd/content"
)

// MultiWriterIngester an ingester that can provide a single writer or multiple writers for a single
// descriptor. Useful when the target of a descriptor can have multiple items within it, e.g. a layer
// that is a tar file with multiple files, each of which should go to a different stream, some of which
// should not be handled at all
type MultiWriterIngester interface {
ctrcontent.Ingester
Writers(ctx context.Context, opts ...ctrcontent.WriterOpt) (map[string]ctrcontent.Writer, error)
}
17 changes: 14 additions & 3 deletions pkg/content/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

type WriterOpts struct {
InputHash *digest.Digest
OutputHash *digest.Digest
Blocksize int
InputHash *digest.Digest
OutputHash *digest.Digest
Blocksize int
MultiWriterIngester bool
}

type WriterOpt func(*WriterOpts) error
Expand Down Expand Up @@ -60,3 +61,13 @@ func WithBlocksize(blocksize int) WriterOpt {
return nil
}
}

// WithMultiWriterIngester the passed ingester also implements MultiWriter
// and should be used as such. If this is set to true, but the ingester does not
// implement MultiWriter, calling Writer should return an error.
func WithMultiWriterIngester() WriterOpt {
return func(w *WriterOpts) error {
w.MultiWriterIngester = true
return nil
}
}
123 changes: 123 additions & 0 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package content

import (
"context"
"errors"
"io"
"time"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -137,3 +139,124 @@ func (u *underlyingWriter) Digest() digest.Digest {
}
return u.digester.Digest()
}

// PassthroughMultiWriter single writer that passes through to multiple writers, allowing the passthrough
// function to select which writer to use.
type PassthroughMultiWriter struct {
writers []*PassthroughWriter
pipew *io.PipeWriter
digester digest.Digester
size int64
reader *io.PipeReader
hash *digest.Digest
done chan error
startedAt time.Time
updatedAt time.Time
ref string
}

func NewPassthroughMultiWriter(writers []content.Writer, f func(r io.Reader, w []io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}

var pws []*PassthroughWriter
r, w := io.Pipe()
for _, writer := range writers {
pws = append(pws, &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingWriter: &underlyingWriter{
writer: writer,
digester: digest.Canonical.Digester(),
hash: wOpts.OutputHash,
},
reader: r,
hash: wOpts.InputHash,
done: make(chan error, 1),
})
}

pmw := &PassthroughMultiWriter{
writers: pws,
startedAt: time.Now(),
updatedAt: time.Now(),
done: make(chan error, 1),
}
// get our output writers
var uws []io.Writer
for _, uw := range pws {
uws = append(uws, uw.underlyingWriter)
}
go f(r, uws, pmw.done)
return pmw
}

func (pmw *PassthroughMultiWriter) Write(p []byte) (n int, err error) {
n, err = pmw.pipew.Write(p)
if pmw.hash == nil {
pmw.digester.Hash().Write(p[:n])
}
pmw.size += int64(n)
pmw.updatedAt = time.Now()
return
}

func (pmw *PassthroughMultiWriter) Close() error {
pmw.pipew.Close()
for _, w := range pmw.writers {
w.Close()
}
return nil
}

// Digest may return empty digest or panics until committed.
func (pmw *PassthroughMultiWriter) Digest() digest.Digest {
if pmw.hash != nil {
return *pmw.hash
}
return pmw.digester.Digest()
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
// size and expected can be zero-value when unknown.
// Commit always closes the writer, even on error.
// ErrAlreadyExists aborts the writer.
func (pmw *PassthroughMultiWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
pmw.pipew.Close()
err := <-pmw.done
pmw.reader.Close()
if err != nil && err != io.EOF {
return err
}

// Some underlying writers will validate an expected digest, so we need the option to pass it
// that digest. That is why we caluclate the digest of the underlying writer throughout the write process.
for _, w := range pmw.writers {
// maybe this should be Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
w.done <- err
if err := w.Commit(ctx, size, expected, opts...); err != nil {
return err
}
}
return nil
}

// Status returns the current state of write
func (pmw *PassthroughMultiWriter) Status() (content.Status, error) {
return content.Status{
StartedAt: pmw.startedAt,
UpdatedAt: pmw.updatedAt,
Total: pmw.size,
}, nil
}

// Truncate updates the size of the target blob, but cannot do anything with a multiwriter
func (pmw *PassthroughMultiWriter) Truncate(size int64) error {
return errors.New("truncate unavailable on multiwriter")
}
80 changes: 80 additions & 0 deletions pkg/content/untar.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,83 @@ func NewUntarWriter(writer content.Writer, opts ...WriterOpt) content.Writer {
done <- err
}, opts...)
}

// NewUntarWriterByName wrap multiple writers with an untar, so that the stream is untarred and passed
// to the appropriate writer, based on the filename. If a filename is not found, it will not pass it
// to any writer. The filename "" will handle any stream that does not have a specific filename; use
// it for the default of a single file in a tar stream.
func NewUntarWriterByName(writers map[string]content.Writer, opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}

// construct an array of content.Writer
nameToIndex := map[string]int{}
var writerSlice []content.Writer
for name, writer := range writers {
writerSlice = append(writerSlice, writer)
nameToIndex[name] = len(writerSlice) - 1
}
// need a PassthroughMultiWriter here
return NewPassthroughMultiWriter(writerSlice, func(r io.Reader, ws []io.Writer, done chan<- error) {
tr := tar.NewReader(r)
var err error
for {
header, err := tr.Next()
if err == io.EOF {
// clear the error, since we do not pass an io.EOF
err = nil
break // End of archive
}
if err != nil {
// pass the error on
err = fmt.Errorf("UntarWriter tar file header read error: %v", err)
break
}
// get the filename
filename := header.Name
index, ok := nameToIndex[filename]
if !ok {
index, ok = nameToIndex[""]
if !ok {
// we did not find this file or the wildcard, so do not process this file
continue
}
}

// write out the untarred data
// we can handle io.EOF, just go to the next file
// any other errors should stop and get reported
b := make([]byte, wOpts.Blocksize, wOpts.Blocksize)
for {
var n int
n, err = tr.Read(b)
if err != nil && err != io.EOF {
err = fmt.Errorf("UntarWriter file data read error: %v\n", err)
break
}
l := n
if n > len(b) {
l = len(b)
}
if _, err2 := ws[index].Write(b[:l]); err2 != nil {
err = fmt.Errorf("UntarWriter error writing to underlying writer at index %d for name '%s': %v", index, filename, err2)
break
}
if err == io.EOF {
// go to the next file
break
}
}
// did we break with a non-nil and non-EOF error?
if err != nil && err != io.EOF {
break
}
}
done <- err
}, opts...)
}