From 75801fef923e9d9945f512494f4790cdf1c022cd Mon Sep 17 00:00:00 2001 From: Avi Deitcher Date: Fri, 12 Mar 2021 02:26:36 +0200 Subject: [PATCH] pass through on write (#230) Signed-off-by: Avi Deitcher --- pkg/oras/oras_test.go | 2 +- pkg/oras/store.go | 104 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/pkg/oras/oras_test.go b/pkg/oras/oras_test.go index a611ab67e..bd7bc9234 100644 --- a/pkg/oras/oras_test.go +++ b/pkg/oras/oras_test.go @@ -362,7 +362,7 @@ func (suite *ORASTestSuite) Test_4_GHSA_g5v4_5x39_vwhx() { suite.FailNow("error creating temp directory", err) } defer os.RemoveAll(tempDir) - store := orascontent.NewFileStore(tempDir) + store := orascontent.NewFileStore(tempDir, orascontent.WithIgnoreNoName()) defer store.Close() ref = fmt.Sprintf("%s/evil:%s", suite.DockerRegistryHost, tag) _, _, err = Pull(newContext(), newResolver(), ref, store) diff --git a/pkg/oras/store.go b/pkg/oras/store.go index d26c53c7b..84348d52a 100644 --- a/pkg/oras/store.go +++ b/pkg/oras/store.go @@ -3,12 +3,15 @@ package oras import ( "context" "errors" + "io" + "time" orascontent "github.com/deislabs/oras/pkg/content" "github.com/containerd/containerd/content" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/errgroup" ) // ensure interface @@ -62,7 +65,15 @@ func (s *hybridStore) Writer(ctx context.Context, opts ...content.WriterOpt) (co } if isAllowedMediaType(wOpts.Desc.MediaType, ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex) || s.ingester == nil { - return s.cache.Writer(ctx, opts...) + cacheWriter, err := s.cache.Writer(ctx, opts...) + if err != nil { + return nil, err + } + ingesterWriter, err := s.ingester.Writer(ctx, opts...) + if err != nil { + return nil, err + } + return newTeeWriter(wOpts.Desc, cacheWriter, ingesterWriter), nil } return s.ingester.Writer(ctx, opts...) } @@ -117,3 +128,94 @@ func (s *hybridStore) ListStatuses(ctx context.Context, filters ...string) ([]co func (s *hybridStore) Abort(ctx context.Context, ref string) error { return errors.New("not yet implemented: Abort (content.Store interface)") } + +// teeWriter tees the content to one or more content.Writer +type teeWriter struct { + writers []content.Writer + digester digest.Digester + status content.Status +} + +func newTeeWriter(desc ocispec.Descriptor, writers ...content.Writer) *teeWriter { + now := time.Now() + return &teeWriter{ + writers: writers, + digester: digest.Canonical.Digester(), + status: content.Status{ + Total: desc.Size, + StartedAt: now, + UpdatedAt: now, + }, + } +} + +func (t *teeWriter) Close() error { + g := new(errgroup.Group) + for _, w := range t.writers { + w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + return w.Close() + }) + } + return g.Wait() +} + +func (t *teeWriter) Write(p []byte) (n int, err error) { + g := new(errgroup.Group) + for _, w := range t.writers { + w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + n, err := w.Write(p[:]) + if err != nil { + return err + } + if n != len(p) { + return io.ErrShortWrite + } + return nil + }) + } + err = g.Wait() + n = len(p) + if err != nil { + return n, err + } + _, _ = t.digester.Hash().Write(p[:n]) + t.status.Offset += int64(len(p)) + t.status.UpdatedAt = time.Now() + + return n, nil +} + +// Digest may return empty digest or panics until committed. +func (t *teeWriter) Digest() digest.Digest { + return t.digester.Digest() +} + +func (t *teeWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + g := new(errgroup.Group) + for _, w := range t.writers { + w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + return w.Commit(ctx, size, expected, opts...) + }) + } + return g.Wait() +} + +// Status returns the current state of write +func (t *teeWriter) Status() (content.Status, error) { + return t.status, nil +} + +// Truncate updates the size of the target blob +func (t *teeWriter) Truncate(size int64) error { + g := new(errgroup.Group) + for _, w := range t.writers { + w := w // closure issues, see https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + return w.Truncate(size) + }) + } + return g.Wait() +}