Skip to content

Commit

Permalink
pass through on write (#230)
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Deitcher <[email protected]>
  • Loading branch information
deitch authored Mar 12, 2021
1 parent 0380ecb commit 75801fe
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/oras/oras_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (suite *ORASTestSuite) Test_4_GHSA_g5v4_5x39_vwhx() {
suite.FailNow("error creating temp directory", err)
}
defer os.RemoveAll(tempDir)
store := orascontent.NewFileStore(tempDir)
store := orascontent.NewFileStore(tempDir, orascontent.WithIgnoreNoName())
defer store.Close()
ref = fmt.Sprintf("%s/evil:%s", suite.DockerRegistryHost, tag)
_, _, err = Pull(newContext(), newResolver(), ref, store)
Expand Down
104 changes: 103 additions & 1 deletion pkg/oras/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package oras
import (
"context"
"errors"
"io"
"time"

orascontent "github.com/deislabs/oras/pkg/content"

"github.com/containerd/containerd/content"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
)

// ensure interface
Expand Down Expand Up @@ -62,7 +65,15 @@ func (s *hybridStore) Writer(ctx context.Context, opts ...content.WriterOpt) (co
}

if isAllowedMediaType(wOpts.Desc.MediaType, ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex) || s.ingester == nil {
return s.cache.Writer(ctx, opts...)
cacheWriter, err := s.cache.Writer(ctx, opts...)
if err != nil {
return nil, err
}
ingesterWriter, err := s.ingester.Writer(ctx, opts...)
if err != nil {
return nil, err
}
return newTeeWriter(wOpts.Desc, cacheWriter, ingesterWriter), nil
}
return s.ingester.Writer(ctx, opts...)
}
Expand Down Expand Up @@ -117,3 +128,94 @@ func (s *hybridStore) ListStatuses(ctx context.Context, filters ...string) ([]co
func (s *hybridStore) Abort(ctx context.Context, ref string) error {
return errors.New("not yet implemented: Abort (content.Store interface)")
}

// teeWriter tees the content to one or more content.Writer
type teeWriter struct {
writers []content.Writer
digester digest.Digester
status content.Status
}

func newTeeWriter(desc ocispec.Descriptor, writers ...content.Writer) *teeWriter {
now := time.Now()
return &teeWriter{
writers: writers,
digester: digest.Canonical.Digester(),
status: content.Status{
Total: desc.Size,
StartedAt: now,
UpdatedAt: now,
},
}
}

func (t *teeWriter) Close() error {
g := new(errgroup.Group)
for _, w := range t.writers {
w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
return w.Close()
})
}
return g.Wait()
}

func (t *teeWriter) Write(p []byte) (n int, err error) {
g := new(errgroup.Group)
for _, w := range t.writers {
w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
n, err := w.Write(p[:])
if err != nil {
return err
}
if n != len(p) {
return io.ErrShortWrite
}
return nil
})
}
err = g.Wait()
n = len(p)
if err != nil {
return n, err
}
_, _ = t.digester.Hash().Write(p[:n])
t.status.Offset += int64(len(p))
t.status.UpdatedAt = time.Now()

return n, nil
}

// Digest may return empty digest or panics until committed.
func (t *teeWriter) Digest() digest.Digest {
return t.digester.Digest()
}

func (t *teeWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
g := new(errgroup.Group)
for _, w := range t.writers {
w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
return w.Commit(ctx, size, expected, opts...)
})
}
return g.Wait()
}

// Status returns the current state of write
func (t *teeWriter) Status() (content.Status, error) {
return t.status, nil
}

// Truncate updates the size of the target blob
func (t *teeWriter) Truncate(size int64) error {
g := new(errgroup.Group)
for _, w := range t.writers {
w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
return w.Truncate(size)
})
}
return g.Wait()
}

0 comments on commit 75801fe

Please sign in to comment.