Skip to content

Commit

Permalink
options to content writers, including pre-providing the input and out…
Browse files Browse the repository at this point in the history
…put hashes (#192)

Signed-off-by: Avi Deitcher <[email protected]>
  • Loading branch information
deitch authored Nov 9, 2020
1 parent 4cae1db commit 2ca9464
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 85 deletions.
10 changes: 9 additions & 1 deletion pkg/content/consts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package content

import ocispec "github.com/opencontainers/image-spec/specs-go/v1"
import (
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

const (
// DefaultBlobMediaType specifies the default blob media type
Expand Down Expand Up @@ -32,3 +35,8 @@ const (
// Simply uses the same size as io.Copy()
DefaultBlocksize = 32768
)

const (
// what you get for a blank digest
BlankHash = digest.Digest("sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
)
19 changes: 14 additions & 5 deletions pkg/content/gunzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ import (
)

// NewGunzipWriter wrap a writer with a gunzip, so that the stream is gunzipped
func NewGunzipWriter(writer content.Writer, blocksize int) content.Writer {
if blocksize == 0 {
blocksize = DefaultBlocksize
//
// By default, it calculates the hash when writing. If the option `skipHash` is true,
// it will skip doing the hash. Skipping the hash is intended to be used only
// if you are confident about the validity of the data being passed to the writer,
// and wish to save on the hashing time.
func NewGunzipWriter(writer content.Writer, opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}
return NewPassthroughWriter(writer, func(r io.Reader, w io.Writer, done chan<- error) {
gr, err := gzip.NewReader(r)
Expand All @@ -20,7 +29,7 @@ func NewGunzipWriter(writer content.Writer, blocksize int) content.Writer {
return
}
// write out the uncompressed data
b := make([]byte, blocksize, blocksize)
b := make([]byte, wOpts.Blocksize, wOpts.Blocksize)
for {
var n int
n, err = gr.Read(b)
Expand All @@ -44,5 +53,5 @@ func NewGunzipWriter(writer content.Writer, blocksize int) content.Writer {
}
gr.Close()
done <- err
})
}, opts...)
}
39 changes: 27 additions & 12 deletions pkg/content/iowriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,57 @@ type IoContentWriter struct {
writer io.Writer
digester digest.Digester
size int64
hash *digest.Digest
}

// NewIoContentWriter create a new IoContentWriter. blocksize is the size of the block to copy,
// in bytes, between the parent and child. The default, when 0, is to simply use
// whatever golang defaults to with io.Copy
func NewIoContentWriter(writer io.Writer, blocksize int) content.Writer {
// NewIoContentWriter create a new IoContentWriter.
//
// By default, it calculates the hash when writing. If the option `skipHash` is true,
// it will skip doing the hash. Skipping the hash is intended to be used only
// if you are confident about the validity of the data being passed to the writer,
// and wish to save on the hashing time.
func NewIoContentWriter(writer io.Writer, opts ...WriterOpt) content.Writer {
w := writer
if w == nil {
w = ioutil.Discard
}
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}
ioc := &IoContentWriter{
writer: w,
digester: digest.Canonical.Digester(),
// we take the OutputHash, since the InputHash goes to the passthrough writer,
// which then passes the processed output to us
hash: wOpts.OutputHash,
}
return NewPassthroughWriter(ioc, func(r io.Reader, w io.Writer, done chan<- error) {
// write out the data to the io writer
var (
err error
)
if blocksize == 0 {
_, err = io.Copy(w, r)
} else {
b := make([]byte, blocksize, blocksize)
_, err = io.CopyBuffer(w, r, b)
}
// we could use io.Copy, but calling it with the default blocksize is identical to
// io.CopyBuffer. Otherwise, we would need some way to let the user flag "I want to use
// io.Copy", when it should not matter to them
b := make([]byte, wOpts.Blocksize, wOpts.Blocksize)
_, err = io.CopyBuffer(w, r, b)
done <- err
})
}, opts...)
}

func (w *IoContentWriter) Write(p []byte) (n int, err error) {
n, err = w.writer.Write(p)
if err != nil {
return 0, err
}
w.digester.Hash().Write(p[:n])
w.size += int64(n)
if w.hash == nil {
w.digester.Hash().Write(p[:n])
}
return
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/content/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package content

import (
"github.com/opencontainers/go-digest"
)

type WriterOpts struct {
InputHash *digest.Digest
OutputHash *digest.Digest
Blocksize int
}

type WriterOpt func(*WriterOpts) error

func DefaultWriterOpts() WriterOpts {
return WriterOpts{
InputHash: nil,
OutputHash: nil,
Blocksize: DefaultBlocksize,
}
}

// WithInputHash provide the expected input hash to a writer. Writers
// may suppress their own calculation of a hash on the stream, taking this
// hash instead. If the Writer processes the data before passing it on to another
// Writer layer, this is the hash of the *input* stream.
//
// To have a blank hash, use WithInputHash(BlankHash).
func WithInputHash(hash digest.Digest) WriterOpt {
return func(w *WriterOpts) error {
w.InputHash = &hash
return nil
}
}

// WithOutputHash provide the expected output hash to a writer. Writers
// may suppress their own calculation of a hash on the stream, taking this
// hash instead. If the Writer processes the data before passing it on to another
// Writer layer, this is the hash of the *output* stream.
//
// To have a blank hash, use WithInputHash(BlankHash).
func WithOutputHash(hash digest.Digest) WriterOpt {
return func(w *WriterOpts) error {
w.OutputHash = &hash
return nil
}
}

// WithBlocksize set the blocksize used by the processor of data.
// The default is DefaultBlocksize, which is the same as that used by io.Copy
func WithBlocksize(blocksize int) WriterOpt {
return func(w *WriterOpts) error {
w.Blocksize = blocksize
return nil
}
}
86 changes: 61 additions & 25 deletions pkg/content/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,52 @@ import (
// PassthroughWriter takes an input stream and passes it through to an underlying writer,
// while providing the ability to manipulate the stream before it gets passed through
type PassthroughWriter struct {
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingDigester digest.Digester
underlyingSize int64
reader *io.PipeReader
done chan error
writer content.Writer
pipew *io.PipeWriter
digester digest.Digester
size int64
underlyingWriter *underlyingWriter
reader *io.PipeReader
hash *digest.Digest
done chan error
}

// NewPassthroughWriter creates a pass-through writer that allows for processing
// the content via an arbitrary function. The function should do whatever processing it
// wants, reading from the Reader to the Writer. When done, it must indicate via
// sending an error or nil to the Done
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error)) content.Writer {
func NewPassthroughWriter(writer content.Writer, f func(r io.Reader, w io.Writer, done chan<- error), opts ...WriterOpt) content.Writer {
// process opts for default
wOpts := DefaultWriterOpts()
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil
}
}

r, w := io.Pipe()
pw := &PassthroughWriter{
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingDigester: digest.Canonical.Digester(),
reader: r,
done: make(chan error, 1),
writer: writer,
pipew: w,
digester: digest.Canonical.Digester(),
underlyingWriter: &underlyingWriter{
writer: writer,
digester: digest.Canonical.Digester(),
hash: wOpts.OutputHash,
},
reader: r,
hash: wOpts.InputHash,
done: make(chan error, 1),
}
uw := &underlyingWriter{
pw: pw,
}
go f(r, uw, pw.done)
go f(r, pw.underlyingWriter, pw.done)
return pw
}

func (pw *PassthroughWriter) Write(p []byte) (n int, err error) {
n, err = pw.pipew.Write(p)
pw.digester.Hash().Write(p[:n])
if pw.hash == nil {
pw.digester.Hash().Write(p[:n])
}
pw.size += int64(n)
return
}
Expand All @@ -57,6 +69,9 @@ func (pw *PassthroughWriter) Close() error {

// Digest may return empty digest or panics until committed.
func (pw *PassthroughWriter) Digest() digest.Digest {
if pw.hash != nil {
return *pw.hash
}
return pw.digester.Digest()
}

Expand All @@ -71,7 +86,10 @@ func (pw *PassthroughWriter) Commit(ctx context.Context, size int64, expected di
if err != nil && err != io.EOF {
return err
}
return pw.writer.Commit(ctx, pw.underlyingSize, pw.underlyingDigester.Digest(), opts...)

// Some underlying writers will validate an expected digest, so we need the option to pass it
// that digest. That is why we caluclate the digest of the underlying writer throughout the write process.
return pw.writer.Commit(ctx, pw.underlyingWriter.size, pw.underlyingWriter.Digest(), opts...)
}

// Status returns the current state of write
Expand All @@ -87,17 +105,35 @@ func (pw *PassthroughWriter) Truncate(size int64) error {
// underlyingWriter implementation of io.Writer to write to the underlying
// io.Writer
type underlyingWriter struct {
pw *PassthroughWriter
writer content.Writer
digester digest.Digester
size int64
hash *digest.Digest
}

// Write write to the underlying writer
func (u *underlyingWriter) Write(p []byte) (int, error) {
n, err := u.pw.writer.Write(p)
n, err := u.writer.Write(p)
if err != nil {
return 0, err
}

u.pw.underlyingSize += int64(len(p))
u.pw.underlyingDigester.Hash().Write(p)
if u.hash == nil {
u.digester.Hash().Write(p)
}
u.size += int64(len(p))
return n, nil
}

// Size get total size written
func (u *underlyingWriter) Size() int64 {
return u.size
}

// Digest may return empty digest or panics until committed.
func (u *underlyingWriter) Digest() digest.Digest {
if u.hash != nil {
return *u.hash
}
return u.digester.Digest()
}
Loading

0 comments on commit 2ca9464

Please sign in to comment.