Skip to content

Commit

Permalink
perf: add FetchReference support in the caching proxy (oras-project#464)
Browse files Browse the repository at this point in the history
Signed-off-by: Billy Zha <[email protected]>
  • Loading branch information
qweeah authored and Terry Howe committed Feb 2, 2023
1 parent d5c80de commit 9f6376e
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 25 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.5.0
github.com/spf13/pflag v1.0.5
oras.land/oras-go/v2 v2.0.0-20220726123138-064752f3088e
oras.land/oras-go/v2 v2.0.0-20220804053649-e135557babfa
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.0.2 h1:kG1BFyqVHuQoVQiR1bWGnfz/fmHvvuiSPIV7rvl360E=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
oras.land/oras-go/v2 v2.0.0-20220726123138-064752f3088e h1:0j9CmQd4yoAwwAeykCWKf1i/X45yvOgER9XZNtDGI7w=
oras.land/oras-go/v2 v2.0.0-20220726123138-064752f3088e/go.mod h1:IZRIoIJqkAH6x0pL3tVnpyPUyZgthjSyPcH2kgJvBMo=
oras.land/oras-go/v2 v2.0.0-20220804053649-e135557babfa h1:/K4LND5cuAwZTGZ793qPQOppK8+vNX1JvSFNIEcUDKM=
oras.land/oras-go/v2 v2.0.0-20220804053649-e135557babfa/go.mod h1:IZRIoIJqkAH6x0pL3tVnpyPUyZgthjSyPcH2kgJvBMo=
88 changes: 66 additions & 22 deletions internal/cache/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/registry"
)

type closer func() error
Expand All @@ -39,57 +40,100 @@ type target struct {

// New generates a new target storage with caching.
func New(source oras.Target, cache content.Storage) oras.Target {
return &target{
t := &target{
Target: source,
cache: cache,
}
if refFetcher, ok := source.(registry.ReferenceFetcher); ok {
return &referenceTarget{
target: t,
ReferenceFetcher: refFetcher,
}
}
return t
}

// Fetch fetches the content identified by the descriptor.
func (p *target) Fetch(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) {
rc, err := p.cache.Fetch(ctx, target)
func (t *target) Fetch(ctx context.Context, target ocispec.Descriptor) (io.ReadCloser, error) {
rc, err := t.cache.Fetch(ctx, target)
if err == nil {
// Fetch from cache
return rc, nil
}

rc, err = p.Target.Fetch(ctx, target)
if err != nil {
if rc, err = t.Target.Fetch(ctx, target); err != nil {
return nil, err
}

// Fetch from origin with caching
return t.cacheReadCloser(ctx, rc, target), nil
}

func (t *target) cacheReadCloser(ctx context.Context, rc io.ReadCloser, target ocispec.Descriptor) io.ReadCloser {
pr, pw := io.Pipe()
var wg sync.WaitGroup

wg.Add(1)
var pushErr error
go func() {
defer wg.Done()
pushErr = p.cache.Push(ctx, target, pr)
pushErr = t.cache.Push(ctx, target, pr)
}()
c := closer(func() error {
rcErr := rc.Close()
if err := pw.Close(); err != nil {
return err
}
wg.Wait()
if pushErr != nil {
return pushErr
}
return rcErr
})

return struct {
io.Reader
io.Closer
}{
Reader: io.TeeReader(rc, pw),
Closer: c,
}, nil
Closer: closer(func() error {
rcErr := rc.Close()
if err := pw.Close(); err != nil {
return err
}
wg.Wait()
if pushErr != nil {
return pushErr
}
return rcErr
}),
}
}

// Exists returns true if the described content exists.
func (p *target) Exists(ctx context.Context, desc ocispec.Descriptor) (bool, error) {
exists, err := p.cache.Exists(ctx, desc)
func (t *target) Exists(ctx context.Context, desc ocispec.Descriptor) (bool, error) {
exists, err := t.cache.Exists(ctx, desc)
if err == nil && exists {
return true, nil
}
return p.Target.Exists(ctx, desc)
return t.Target.Exists(ctx, desc)
}

// Cache referenceTarget struct.
type referenceTarget struct {
*target
registry.ReferenceFetcher
}

// FetchReference fetches the content identified by the reference from the
// remote and cache the fetched content.
// Cached content will only be read via Fetch, FetchReference will always fetch
// From origin.
func (t *referenceTarget) FetchReference(ctx context.Context, reference string) (ocispec.Descriptor, io.ReadCloser, error) {
target, rc, err := t.ReferenceFetcher.FetchReference(ctx, reference)
if err != nil {
return ocispec.Descriptor{}, nil, err
}

// skip caching if the content already exists in cache
exists, err := t.cache.Exists(ctx, target)
if err != nil {
return ocispec.Descriptor{}, nil, err
}
if exists {
// no need to do tee'd push
return target, rc, nil
}

// Fetch from origin with caching
return target, t.cacheReadCloser(ctx, rc, target), nil
}
223 changes: 223 additions & 0 deletions internal/cache/target_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
Copyright The ORAS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"bytes"
"context"
_ "crypto/sha256"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strconv"
"sync/atomic"
"testing"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/registry"
"oras.land/oras-go/v2/registry/remote"
)

func TestProxy_fetchCache(t *testing.T) {
blob := []byte("hello world")
desc := ocispec.Descriptor{
MediaType: "test",
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
}

p := New(memory.New(), memory.New())
ctx := context.Background()

err := p.Push(ctx, desc, bytes.NewReader(blob))
if err != nil {
t.Fatal("Proxy.Push() error =", err)
}

// first fetch
exists, err := p.Exists(ctx, desc)
if err != nil {
t.Fatal("Proxy.Exists() error =", err)
}
if !exists {
t.Errorf("Proxy.Exists() = %v, want %v", exists, true)
}
got, err := content.FetchAll(ctx, p, desc)
if err != nil {
t.Fatal("Proxy.Fetch() error =", err)
}
if !bytes.Equal(got, blob) {
t.Errorf("Proxy.Fetch() = %v, want %v", got, blob)
}

// repeated fetch should not touch base CAS
// nil base will generate panic if the base CAS is touched
p.(*target).Target = nil

exists, err = p.Exists(ctx, desc)
if err != nil {
t.Fatal("Proxy.Exists() error =", err)
}
if !exists {
t.Errorf("Proxy.Exists() = %v, want %v", exists, true)
}
got, err = content.FetchAll(ctx, p, desc)
if err != nil {
t.Fatal("Proxy.Fetch() error =", err)
}
if !bytes.Equal(got, blob) {
t.Errorf("Proxy.Fetch() = %v, want %v", got, blob)
}
}

func TestProxy_pushPassThrough(t *testing.T) {
blob := []byte("hello world")
desc := ocispec.Descriptor{
MediaType: "test",
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
}

p := New(memory.New(), memory.New())
ctx := context.Background()

// before push
exists, err := p.Exists(ctx, desc)
if err != nil {
t.Fatal("Proxy.Exists() error =", err)
}
if exists {
t.Errorf("Proxy.Exists() = %v, want %v", exists, false)
}

// push
err = p.Push(ctx, desc, bytes.NewReader(blob))
if err != nil {
t.Fatal("Proxy.Push() error =", err)
}

// after push
exists, err = p.Exists(ctx, desc)
if err != nil {
t.Fatal("Proxy.Exists() error =", err)
}
if !exists {
t.Errorf("Proxy.Exists() = %v, want %v", exists, true)
}
got, err := content.FetchAll(ctx, p, desc)
if err != nil {
t.Fatal("Proxy.Fetch() error =", err)
}
if !bytes.Equal(got, blob) {
t.Errorf("Proxy.Fetch() = %v, want %v", got, blob)
}
}

func TestProxy_fetchReference(t *testing.T) {
// mocked variables
blob := []byte("{}")
repoName := "test/repo"
tagName := "test-tag"
mediaType := ocispec.MediaTypeImageManifest
digest := digest.FromBytes(blob)
desc := ocispec.Descriptor{
MediaType: mediaType,
Digest: digest,
Size: int64(len(blob)),
}

// mocked remote registry
var requestCount, wantRequestCount int64
var successCount, wantSuccessCount int64
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&requestCount, 1)

if r.Method == http.MethodGet &&
(r.URL.Path == fmt.Sprintf("/v2/%s/manifests/%s", repoName, tagName) ||
r.URL.Path == fmt.Sprintf("/v2/%s/manifests/%s", repoName, digest)) {
w.Header().Set("Content-Type", mediaType)
w.Header().Set("Docker-Content-Digest", digest.String())
w.Header().Set("Content-Length", strconv.Itoa(len([]byte(blob))))
w.WriteHeader(http.StatusOK)
w.Write(blob)
atomic.AddInt64(&successCount, 1)
return
}
t.Errorf("unexpected access: %s %s", r.Method, r.URL)
w.WriteHeader(http.StatusBadRequest)
}))
defer ts.Close()
uri, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("invalid test http server: %v", err)
}
repo, err := remote.NewRepository(fmt.Sprintf("%s/%s:%s", uri.Host, repoName, tagName))
if err != nil {
t.Fatalf("NewRepository() error = %v", err)
}
repo.PlainHTTP = true
p := New(repo, memory.New())
ctx := context.Background()

// first fetch reference
gotDesc, rc, err := p.(registry.ReferenceFetcher).FetchReference(ctx, repo.Reference.Reference)
if err != nil {
t.Fatal("ReferenceTarget.FetchReference() error =", err)
}
if !reflect.DeepEqual(gotDesc, desc) {
t.Fatalf("ReferenceTarget.FetchReference() got %v, want %v", gotDesc, desc)
}
got, err := io.ReadAll(rc)
if err != nil {
t.Fatal("io.ReadAll() error =", err)
}
err = rc.Close()
if err != nil {
t.Error("ReferenceTarget.FetchReference().Close() error =", err)
}

if !bytes.Equal(got, blob) {
t.Errorf("ReferenceTarget.Fetch() = %v, want %v", got, blob)
}
if wantRequestCount++; requestCount != wantRequestCount {
t.Errorf("unexpected number of requests: %d, want %d", requestCount, wantRequestCount)
}
if wantSuccessCount++; successCount != wantSuccessCount {
t.Errorf("unexpected number of successful requests: %d, want %d", successCount, wantSuccessCount)
}

// repeated fetch should not touch base CAS
p.(*referenceTarget).Target = nil
got, err = content.FetchAll(ctx, p, desc)
if err != nil {
t.Fatal("ReferenceTarget.Fetch() error =", err)
}
if !bytes.Equal(got, blob) {
t.Errorf("ReferenceTarget.Fetch() = %v, want %v", got, blob)
}
if requestCount != wantRequestCount {
t.Errorf("unexpected number of requests: %d, want %d", requestCount, wantRequestCount)
}
if successCount != wantSuccessCount {
t.Errorf("unexpected number of successful requests: %d, want %d", successCount, wantSuccessCount)
}
}

0 comments on commit 9f6376e

Please sign in to comment.