From 8c74744e146e5842c3b4d53dccc645bb79a24385 Mon Sep 17 00:00:00 2001 From: Christian Weichel Date: Wed, 30 Mar 2022 15:48:14 +0000 Subject: [PATCH] [registry-facade] Support blob caching in Redis --- .../blobserve/pkg/blobserve/refstore.go | 2 +- .../registry-facade/.vscode/launch.json | 19 ++ components/registry-facade/example-spec.json | 15 +- components/registry-facade/go.mod | 1 + components/registry-facade/go.sum | 9 + .../registry-facade/pkg/registry/blob.go | 8 +- .../registry-facade/pkg/registry/cache.go | 269 ++++++++++++++++++ .../pkg/registry/cache_test.go | 115 ++++++++ .../registry-facade/pkg/registry/imagecfg.go | 10 +- .../registry-facade/pkg/registry/ipfs.go | 89 ------ .../pkg/registry/layersource.go | 4 +- .../pkg/registry/layersource_test.go | 4 +- .../registry-facade/pkg/registry/manifest.go | 166 ++++++++--- .../registry-facade/pkg/registry/registry.go | 57 +++- .../registry-facade/pkg/registry/resolver.go | 93 ++++++ 15 files changed, 709 insertions(+), 152 deletions(-) create mode 100644 components/registry-facade/.vscode/launch.json create mode 100644 components/registry-facade/pkg/registry/cache.go create mode 100644 components/registry-facade/pkg/registry/cache_test.go delete mode 100644 components/registry-facade/pkg/registry/ipfs.go create mode 100644 components/registry-facade/pkg/registry/resolver.go diff --git a/components/blobserve/pkg/blobserve/refstore.go b/components/blobserve/pkg/blobserve/refstore.go index 0b04faaccf4e3a..598fbe56af443e 100644 --- a/components/blobserve/pkg/blobserve/refstore.go +++ b/components/blobserve/pkg/blobserve/refstore.go @@ -304,7 +304,7 @@ func resolveRef(ctx context.Context, ref string, resolver remotes.Resolver) (*oc if err != nil { return nil, err } - manifest, _, err := registry.DownloadManifest(ctx, fetcher, desc) + manifest, _, err := registry.DownloadManifest(ctx, registry.AsFetcherFunc(fetcher), desc) if err != nil { return nil, err } diff --git a/components/registry-facade/.vscode/launch.json b/components/registry-facade/.vscode/launch.json new file mode 100644 index 00000000000000..6c92c1de443726 --- /dev/null +++ b/components/registry-facade/.vscode/launch.json @@ -0,0 +1,19 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [ "-v", "run", "example-config.json"], + "showLog": false + } + ] +} \ No newline at end of file diff --git a/components/registry-facade/example-spec.json b/components/registry-facade/example-spec.json index 33710dedcfef63..f47a65dc1b85b5 100644 --- a/components/registry-facade/example-spec.json +++ b/components/registry-facade/example-spec.json @@ -1,5 +1,12 @@ { - "baseRef": "docker.io/library/ubuntu:latest", - "ideRef": "eu.gcr.io/gitpod-core-dev/build/ide/code:commit-8dd2ddd844f30a4ff66d2704f4714e9da875c7d5", - "supervisorRef": "eu.gcr.io/gitpod-core-dev/build/supervisor:main.2733" -} + "foo": { + "baseRef": "docker.io/library/ubuntu:latest", + "ideRef": "eu.gcr.io/gitpod-core-dev/build/ide/code:commit-8dd2ddd844f30a4ff66d2704f4714e9da875c7d5", + "supervisorRef": "eu.gcr.io/gitpod-core-dev/build/supervisor:main.2733" + }, + "bar": { + "baseRef": "docker.io/library/ubuntu:latest", + "ideRef": "eu.gcr.io/gitpod-core-dev/build/ide/code:commit-8dd2ddd844f30a4ff66d2704f4714e9da875c7d5", + "supervisorRef": "eu.gcr.io/gitpod-core-dev/build/supervisor:main.2762" + } +} \ No newline at end of file diff --git a/components/registry-facade/go.mod b/components/registry-facade/go.mod index b7c058df96fcbd..d6a7c9d4cef7fe 100644 --- a/components/registry-facade/go.mod +++ b/components/registry-facade/go.mod @@ -42,6 +42,7 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/felixge/httpsnoop v1.0.1 // indirect github.com/go-logr/logr v1.2.2 // indirect + github.com/go-redis/redismock/v8 v8.0.6 // indirect github.com/go-test/deep v1.0.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/go-cmp v0.5.6 // indirect diff --git a/components/registry-facade/go.sum b/components/registry-facade/go.sum index a9d3d74aafcf6a..ee0cc296789b44 100644 --- a/components/registry-facade/go.sum +++ b/components/registry-facade/go.sum @@ -429,8 +429,11 @@ github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL9 github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= +github.com/go-redis/redis/v8 v8.8.0/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqWMnCV1iP5Y= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redismock/v8 v8.0.6 h1:rtuijPgGynsRB2Y7KDACm09WvjHWS4RaG44Nm7rcj4Y= +github.com/go-redis/redismock/v8 v8.0.6/go.mod h1:sDIF73OVsmaKzYe/1FJXGiCQ4+oHYbzjpaL9Vor0sS4= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.5 h1:AKODKU3pDH1RzZzm6YZu77YWtEAq6uh1rLIAQlay2qc= @@ -1188,6 +1191,7 @@ github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0 github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= @@ -1202,6 +1206,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= +github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= @@ -1499,6 +1504,7 @@ go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUz go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0/go.mod h1:vEhqr0m4eTc+DWxfsXoXue2GBgV2uUwVznkGIHW/e5w= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4= +go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM= @@ -1506,12 +1512,15 @@ go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.3.0/go.mod h1:VpP4/RMn go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.3.0/go.mod h1:hO1KLR7jcKaDDKDkvI9dP/FIhpmna5lkqPUQdEjFAM8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.3.0/go.mod h1:keUU7UfnwWTWpJ+FWnyqmogPa82nuU5VUANFq49hlMY= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.3.0/go.mod h1:QNX1aly8ehqqX1LEa6YniTU7VY9I6R3X/oPxhGdTceE= +go.opentelemetry.io/otel/metric v0.19.0/go.mod h1:8f9fglJPRnXuskQmKpnad31lcLJ2VmNNqIsx/uIwBSc= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= +go.opentelemetry.io/otel/oteltest v0.19.0/go.mod h1:tI4yxwh8U21v7JD6R3BcA/2+RBoTKFexE/PJ/nSO7IA= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc= go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs= go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE= go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE= +go.opentelemetry.io/otel/trace v0.19.0/go.mod h1:4IXiNextNOpPnRlI4ryK69mn5iC84bjBWZQA5DXz/qg= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= diff --git a/components/registry-facade/pkg/registry/blob.go b/components/registry-facade/pkg/registry/blob.go index 5e4d90bf25b528..539ec5df6c9a59 100644 --- a/components/registry-facade/pkg/registry/blob.go +++ b/components/registry-facade/pkg/registry/blob.go @@ -84,7 +84,7 @@ type blobHandler struct { Spec *api.ImageSpec Resolver remotes.Resolver - Store content.Store + Store BlobStore IPFS *IPFSBlobCache AdditionalSources []BlobSource ConfigModifier ConfigModifier @@ -188,7 +188,7 @@ func (bh *blobHandler) downloadManifest(ctx context.Context, ref string) (res *o log.WithError(err).WithField("ref", ref).WithField("instanceId", bh.Name).Error("cannot get fetcher") return nil, nil, err } - res, _, err = DownloadManifest(ctx, fetcher, desc, WithStore(bh.Store)) + res, _, err = DownloadManifest(ctx, AsFetcherFunc(fetcher), desc, WithStore(bh.Store)) return } @@ -214,7 +214,7 @@ type BlobSource interface { } type storeBlobSource struct { - Store content.Store + Store BlobStore } func (sbs storeBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool { @@ -305,7 +305,7 @@ func (pbs *configBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, d func (pbs *configBlobSource) getConfig(ctx context.Context) (rawCfg []byte, err error) { manifest := *pbs.Manifest - cfg, err := DownloadConfig(ctx, pbs.Fetcher, manifest.Config) + cfg, err := DownloadConfig(ctx, AsFetcherFunc(pbs.Fetcher), "", manifest.Config) if err != nil { return } diff --git a/components/registry-facade/pkg/registry/cache.go b/components/registry-facade/pkg/registry/cache.go new file mode 100644 index 00000000000000..87798e71f11598 --- /dev/null +++ b/components/registry-facade/pkg/registry/cache.go @@ -0,0 +1,269 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package registry + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/gitpod-io/gitpod/common-go/log" + redis "github.com/go-redis/redis/v8" + files "github.com/ipfs/go-ipfs-files" + ipfs "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/opencontainers/go-digest" + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/xerrors" +) + +// ipfsManifestModifier modifies a manifest and adds IPFS URLs to the layers +func (reg *Registry) ipfsManifestModifier(mf *ociv1.Manifest) error { + if reg.IPFS == nil { + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var wg sync.WaitGroup + for i, l := range mf.Layers { + wg.Add(1) + go func(i int, dgst digest.Digest) { + defer wg.Done() + + url, _ := reg.IPFS.Get(ctx, dgst) + if url == "" { + return + } + mf.Layers[i].URLs = append(mf.Layers[i].URLs, url) + }(i, l.Digest) + } + wg.Wait() + + return nil +} + +// IPFSBlobCache can cache blobs in IPFS +type IPFSBlobCache struct { + Redis *redis.Client + IPFS ipfs.CoreAPI +} + +// Get retrieves the IPFS URL for a previously stored blob. +// Returns an error if the blob is not stored in IPFS yet. +func (store *IPFSBlobCache) Get(ctx context.Context, dgst digest.Digest) (ipfsURL string, err error) { + if store == nil || store.IPFS == nil || store.Redis == nil { + return "", nil + } + + res, err := store.Redis.Get(ctx, dgst.String()).Result() + if err != nil { + return "", err + } + + return "ipfs://" + res, nil +} + +// Store stores a blob in IPFS. Will happily overwrite/re-upload a blob. +func (store *IPFSBlobCache) Store(ctx context.Context, dgst digest.Digest, content io.Reader) (err error) { + if store == nil || store.IPFS == nil || store.Redis == nil { + return nil + } + + p, err := store.IPFS.Unixfs().Add(ctx, files.NewReaderFile(content), options.Unixfs.Pin(true), options.Unixfs.CidVersion(1)) + if err != nil { + return err + } + + res := store.Redis.Set(ctx, dgst.String(), p.Cid().String(), 0) + if err := res.Err(); err != nil { + return err + } + + log.WithField("digest", dgst.String()).WithField("cid", p.Cid().String()).Debug("pushed to IPFS") + + return nil +} + +type RedisBlobStore struct { + Client *redis.Client +} + +var _ BlobStore = &RedisBlobStore{} + +// Info will return metadata about content available in the content store. +// +// If the content is not present, ErrNotFound will be returned. +func (rbs *RedisBlobStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + res, err := rbs.Client.Get(ctx, "nfo."+string(dgst)).Result() + if err == redis.Nil { + return content.Info{}, errdefs.ErrNotFound + } + + var redisInfo redisBlobInfo + err = json.Unmarshal([]byte(res), &redisInfo) + if err != nil { + return content.Info{}, xerrors.Errorf("cannot unmarshal blob info: %w", err) + } + + return content.Info{ + Digest: digest.Digest(redisInfo.Digest), + Size: redisInfo.Size, + CreatedAt: time.Unix(redisInfo.CreatedAt, 0), + UpdatedAt: time.Unix(redisInfo.UpdatedAt, 0), + Labels: redisInfo.Labels, + }, nil +} + +func (rbs *RedisBlobStore) ReaderAt(ctx context.Context, desc ociv1.Descriptor) (content.ReaderAt, error) { + res, err := rbs.Client.Get(ctx, "cnt."+string(desc.Digest)).Result() + if err == redis.Nil { + return nil, errdefs.ErrNotFound + } + + return stringReader(res), nil +} + +type stringReader string + +var _ content.ReaderAt = stringReader("") + +func (r stringReader) Size() int64 { return int64(len(r)) } +func (r stringReader) Close() error { return nil } +func (r stringReader) ReadAt(p []byte, off int64) (n int, err error) { + n = copy(p, r[off:]) + if n < len(p) { + return n, io.EOF + } + return +} + +// Some implementations require WithRef to be included in opts. +func (rbs *RedisBlobStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + var wOpts content.WriterOpts + for _, opt := range opts { + if err := opt(&wOpts); err != nil { + return nil, err + } + } + if wOpts.Desc.Digest == "" { + return nil, xerrors.Errorf("desc.digest must not be empty: %w", errdefs.ErrInvalidArgument) + } + + return newRedisBlobWriter(wOpts.Desc.Digest, rbs.Client), nil +} + +type redisBlobWriter struct { + buf *bytes.Buffer + digest digest.Digest + client *redis.Client + + forTestingOnlyTime time.Time +} + +func newRedisBlobWriter(digest digest.Digest, client *redis.Client) *redisBlobWriter { + return &redisBlobWriter{ + buf: bytes.NewBuffer(make([]byte, 0, 4096)), + digest: digest, + client: client, + } +} + +var _ content.Writer = &redisBlobWriter{} + +func (w *redisBlobWriter) Write(b []byte) (n int, err error) { + return w.buf.Write(b) +} + +func (w *redisBlobWriter) Close() error { + return nil +} + +// Digest may return empty digest or panics until committed. +func (w *redisBlobWriter) Digest() digest.Digest { + return w.digest +} + +type redisBlobInfo struct { + Digest string + Size int64 + CreatedAt int64 + UpdatedAt int64 + Labels map[string]string +} + +// Commit commits the blob (but no roll-back is guaranteed on an error). +// size and expected can be zero-value when unknown. +// Commit always closes the writer, even on error. +// ErrAlreadyExists aborts the writer. +func (w *redisBlobWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + act := digest.FromBytes(w.buf.Bytes()) + if expected != "" && expected != act { + return fmt.Errorf("unexpected commit digest %s, expected %s: %w", act, expected, errdefs.ErrFailedPrecondition) + } + + var base content.Info + for _, opt := range opts { + if err := opt(&base); err != nil { + return err + } + } + + var ( + createdAt int64 + updatedAt int64 + ) + if !w.forTestingOnlyTime.IsZero() { + createdAt = w.forTestingOnlyTime.Unix() + updatedAt = w.forTestingOnlyTime.Unix() + } else { + createdAt = time.Now().Unix() + updatedAt = time.Now().Unix() + } + + rnfo, err := json.Marshal(redisBlobInfo{ + Digest: string(expected), + Size: int64(w.buf.Len()), + CreatedAt: createdAt, + UpdatedAt: updatedAt, + Labels: base.Labels, + }) + if err != nil { + return err + } + + var ( + kContent = fmt.Sprintf("cnt.%s", w.digest) + kInfo = fmt.Sprintf("nfo.%s", w.digest) + ttl = 48 * time.Hour + ) + + trans := w.client.Pipeline() + trans.MSet(ctx, map[string]string{ + kContent: w.buf.String(), + kInfo: string(rnfo), + }) + trans.Expire(ctx, kContent, ttl) + trans.Expire(ctx, kInfo, ttl) + _, err = trans.Exec(ctx) + return err +} + +// Status returns the current state of write +func (w *redisBlobWriter) Status() (content.Status, error) { + return content.Status{}, fmt.Errorf("not implemented") +} + +// Truncate updates the size of the target blob +func (w *redisBlobWriter) Truncate(size int64) error { + return fmt.Errorf("not implemented") +} diff --git a/components/registry-facade/pkg/registry/cache_test.go b/components/registry-facade/pkg/registry/cache_test.go new file mode 100644 index 00000000000000..648623bd1446a3 --- /dev/null +++ b/components/registry-facade/pkg/registry/cache_test.go @@ -0,0 +1,115 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package registry + +import ( + "context" + "testing" + "time" + + "github.com/containerd/containerd/content" + redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redismock/v8" + "github.com/google/go-cmp/cmp" + "github.com/opencontainers/go-digest" + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" +) + +func TestRedisBlobStore_Info(t *testing.T) { + ctx := context.Background() + + type Expectation struct { + Info content.Info + Error string + } + tests := []struct { + Name string + Content string + Expectation Expectation + }{ + { + Name: "not found", + Expectation: Expectation{ + Error: "not found", + }, + }, + { + Name: "invalid JSON", + Content: "foo", + Expectation: Expectation{ + Error: "cannot unmarshal blob info: invalid character 'o' in literal false (expecting 'a')", + }, + }, + { + Name: "valid", + Content: `{"Digest":"digest", "Size": 1234, "CreatedAt": 7899, "UpdatedAt": 9999, "Labels": {"foo":"bar"}}`, + Expectation: Expectation{ + Info: content.Info{ + Digest: "digest", + Size: 1234, + CreatedAt: time.Unix(7899, 0), + UpdatedAt: time.Unix(9999, 0), + Labels: map[string]string{"foo": "bar"}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + client, mock := redismock.NewClientMock() + + dgst := digest.FromString(test.Content) + if test.Content == "" { + mock.ExpectGet("nfo." + string(dgst)).SetErr(redis.Nil) + } else { + mock.ExpectGet("nfo." + string(dgst)).SetVal(test.Content) + } + + var ( + act Expectation + err error + store = &RedisBlobStore{Client: client} + ) + act.Info, err = store.Info(ctx, dgst) + if err != nil { + act.Error = err.Error() + } + + if diff := cmp.Diff(test.Expectation, act); diff != "" { + t.Errorf("Info() mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestRedisBlobStore_Writer(t *testing.T) { + cnt := []byte("hello world") + dgst := digest.FromBytes(cnt) + + client, mock := redismock.NewClientMock() + mock.ExpectMSet(map[string]string{ + "cnt." + string(dgst): string(cnt), + "nfo." + string(dgst): `{"Digest":"` + string(dgst) + `","Size":11,"CreatedAt":1,"UpdatedAt":1,"Labels":{"foo":"bar"}}`, + }).SetVal("") + mock.ExpectExpire("cnt."+string(dgst), 48*time.Hour).SetVal(true) + mock.ExpectExpire("nfo."+string(dgst), 48*time.Hour).SetVal(true) + + store := &RedisBlobStore{Client: client} + w, err := store.Writer(context.Background(), content.WithDescriptor(ociv1.Descriptor{ + Digest: dgst, + MediaType: ociv1.MediaTypeImageConfig, + })) + if err != nil { + t.Fatal(err) + } + w.(*redisBlobWriter).forTestingOnlyTime = time.Unix(1, 1) + _, _ = w.Write(cnt) + w.Close() + err = w.Commit(context.Background(), int64(len(cnt)), dgst, content.WithLabels(map[string]string{"foo": "bar"})) + if err != nil { + t.Fatal(err) + } +} diff --git a/components/registry-facade/pkg/registry/imagecfg.go b/components/registry-facade/pkg/registry/imagecfg.go index 5172693ea3b3b8..ddfdac9b663e27 100644 --- a/components/registry-facade/pkg/registry/imagecfg.go +++ b/components/registry-facade/pkg/registry/imagecfg.go @@ -8,6 +8,7 @@ import ( "context" "sync" + "github.com/containerd/containerd/errdefs" lru "github.com/hashicorp/golang-lru" ociv1 "github.com/opencontainers/image-spec/specs-go/v1" "golang.org/x/xerrors" @@ -28,11 +29,14 @@ type ImageSpecProvider interface { } // FixedImageSpecProvider provides a single spec -type FixedImageSpecProvider api.ImageSpec +type FixedImageSpecProvider map[string]*api.ImageSpec func (p FixedImageSpecProvider) GetSpec(ctx context.Context, ref string) (*api.ImageSpec, error) { - res := api.ImageSpec(p) - return &res, nil + res, ok := p[ref] + if !ok { + return nil, xerrors.Errorf("%w: %s", ErrRefInvalid, errdefs.ErrNotFound) + } + return res, nil } // RemoteSpecProvider queries a remote spec provider using gRPC diff --git a/components/registry-facade/pkg/registry/ipfs.go b/components/registry-facade/pkg/registry/ipfs.go deleted file mode 100644 index 9408b1bba8a4e3..00000000000000 --- a/components/registry-facade/pkg/registry/ipfs.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) 2022 Gitpod GmbH. All rights reserved. -// Licensed under the GNU Affero General Public License (AGPL). -// See License-AGPL.txt in the project root for license information. - -package registry - -import ( - "context" - "io" - "sync" - "time" - - "github.com/gitpod-io/gitpod/common-go/log" - redis "github.com/go-redis/redis/v8" - files "github.com/ipfs/go-ipfs-files" - ipfs "github.com/ipfs/interface-go-ipfs-core" - "github.com/ipfs/interface-go-ipfs-core/options" - "github.com/opencontainers/go-digest" - ociv1 "github.com/opencontainers/image-spec/specs-go/v1" -) - -// ipfsManifestModifier modifies a manifest and adds IPFS URLs to the layers -func (reg *Registry) ipfsManifestModifier(mf *ociv1.Manifest) error { - if reg.IPFS == nil { - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - var wg sync.WaitGroup - for i, l := range mf.Layers { - wg.Add(1) - go func(i int, dgst digest.Digest) { - defer wg.Done() - - url, _ := reg.IPFS.Get(ctx, dgst) - if url == "" { - return - } - mf.Layers[i].URLs = append(mf.Layers[i].URLs, url) - }(i, l.Digest) - } - wg.Wait() - - return nil -} - -// IPFSBlobCache can cache blobs in IPFS -type IPFSBlobCache struct { - Redis *redis.Client - IPFS ipfs.CoreAPI -} - -// Get retrieves the IPFS URL for a previously stored blob. -// Returns an error if the blob is not stored in IPFS yet. -func (store *IPFSBlobCache) Get(ctx context.Context, dgst digest.Digest) (ipfsURL string, err error) { - if store == nil || store.IPFS == nil || store.Redis == nil { - return "", nil - } - - res, err := store.Redis.Get(ctx, dgst.String()).Result() - if err != nil { - return "", err - } - - return "ipfs://" + res, nil -} - -// Store stores a blob in IPFS. Will happily overwrite/re-upload a blob. -func (store *IPFSBlobCache) Store(ctx context.Context, dgst digest.Digest, content io.Reader) (err error) { - if store == nil || store.IPFS == nil || store.Redis == nil { - return nil - } - - p, err := store.IPFS.Unixfs().Add(ctx, files.NewReaderFile(content), options.Unixfs.Pin(true), options.Unixfs.CidVersion(1)) - if err != nil { - return err - } - - res := store.Redis.Set(ctx, dgst.String(), p.Cid().String(), 0) - if err := res.Err(); err != nil { - return err - } - - log.WithField("digest", dgst.String()).WithField("cid", p.Cid().String()).Debug("pushed to IPFS") - - return nil -} diff --git a/components/registry-facade/pkg/registry/layersource.go b/components/registry-facade/pkg/registry/layersource.go index 2ed98400d1ccc4..33514189259547 100644 --- a/components/registry-facade/pkg/registry/layersource.go +++ b/components/registry-facade/pkg/registry/layersource.go @@ -233,12 +233,12 @@ func NewStaticSourceFromImage(ctx context.Context, resolver remotes.Resolver, re return nil, err } - manifest, _, err := DownloadManifest(ctx, fetcher, desc) + manifest, _, err := DownloadManifest(ctx, AsFetcherFunc(fetcher), desc) if err != nil { return nil, err } - cfg, err := DownloadConfig(ctx, fetcher, manifest.Config) + cfg, err := DownloadConfig(ctx, AsFetcherFunc(fetcher), ref, manifest.Config) if err != nil { return nil, err } diff --git a/components/registry-facade/pkg/registry/layersource_test.go b/components/registry-facade/pkg/registry/layersource_test.go index 2cfab6a41d4271..2fe91ccd9ec096 100644 --- a/components/registry-facade/pkg/registry/layersource_test.go +++ b/components/registry-facade/pkg/registry/layersource_test.go @@ -102,7 +102,7 @@ func createFixtureFromImage(ctx context.Context, resolver remotes.Resolver, ref return nil, err } - mf, _, err := DownloadManifest(ctx, fetcher, desc) + mf, _, err := DownloadManifest(ctx, AsFetcherFunc(fetcher), desc) if err != nil { return nil, err } @@ -111,7 +111,7 @@ func createFixtureFromImage(ctx context.Context, resolver remotes.Resolver, ref return nil, err } - cfg, err := DownloadConfig(ctx, fetcher, mf.Config) + cfg, err := DownloadConfig(ctx, AsFetcherFunc(fetcher), ref, mf.Config) if err != nil { return nil, err } diff --git a/components/registry-facade/pkg/registry/manifest.go b/components/registry-facade/pkg/registry/manifest.go index d0825b52691f56..504f45aa18258d 100644 --- a/components/registry-facade/pkg/registry/manifest.go +++ b/components/registry-facade/pkg/registry/manifest.go @@ -89,7 +89,7 @@ type manifestHandler struct { Spec *api.ImageSpec Resolver remotes.Resolver - Store content.Store + Store BlobStore ConfigModifier ConfigModifier ManifestModifier func(*ociv1.Manifest) error @@ -144,21 +144,23 @@ func (mh *manifestHandler) getManifest(w http.ResponseWriter, r *http.Request) { return err } - fetcher, err := mh.Resolver.Fetcher(ctx, ref) - if err != nil { - log.WithError(err).WithField("ref", ref).WithFields(logFields).Error("cannot get fetcher") - return distv2.ErrorCodeManifestUnknown.WithDetail(err) - } - rc, err := fetcher.Fetch(ctx, desc) - if err != nil { - log.WithError(err).WithField("ref", ref).WithField("desc", desc).WithFields(logFields).Error("cannot fetch manifest") - return distv2.ErrorCodeManifestUnknown.WithDetail(err) + var fcache remotes.Fetcher + fetch := func() (remotes.Fetcher, error) { + if fcache != nil { + return fcache, nil + } + + fetcher, err := mh.Resolver.Fetcher(ctx, ref) + if err != nil { + return nil, err + } + fcache = fetcher + return fcache, nil } - defer rc.Close() - manifest, ndesc, err := DownloadManifest(ctx, fetcher, desc, WithStore(mh.Store)) + manifest, ndesc, err := DownloadManifest(ctx, fetch, desc, WithStore(mh.Store)) if err != nil { - log.WithError(err).WithField("desc", desc).WithFields(logFields).Error("cannot download manifest") + log.WithError(err).WithField("desc", desc).WithFields(logFields).WithField("ref", ref).Error("cannot download manifest") return distv2.ErrorCodeManifestUnknown.WithDetail(err) } desc = *ndesc @@ -167,7 +169,7 @@ func (mh *manifestHandler) getManifest(w http.ResponseWriter, r *http.Request) { switch desc.MediaType { case images.MediaTypeDockerSchema2Manifest, ociv1.MediaTypeImageManifest: // download config - cfg, err := DownloadConfig(ctx, fetcher, manifest.Config) + cfg, err := DownloadConfig(ctx, fetch, ref, manifest.Config, WithStore(mh.Store)) if err != nil { log.WithError(err).WithFields(logFields).Error("cannot download config") return err @@ -189,27 +191,27 @@ func (mh *manifestHandler) getManifest(w http.ResponseWriter, r *http.Request) { } cfgDgst := digest.FromBytes(rawCfg) + // update config digest in manifest + manifest.Config.Digest = cfgDgst + manifest.Config.URLs = nil + manifest.Config.Size = int64(len(rawCfg)) + // optimization: we store the config in the store just in case the client attempts to download the config blob // from us. If they download it from a registry facade from which the manifest hasn't been downloaded // we'll re-create the config on the fly. - if w, err := mh.Store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(desc)); err == nil { + if w, err := mh.Store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(manifest.Config)); err == nil { defer w.Close() _, err = w.Write(rawCfg) if err != nil { log.WithError(err).WithFields(logFields).Warn("cannot write config to store - we'll regenerate it on demand") } - err = w.Commit(ctx, 0, cfgDgst) + err = w.Commit(ctx, 0, cfgDgst, content.WithLabels(contentTypeLabel(manifest.Config.MediaType))) if err != nil { log.WithError(err).WithFields(logFields).Warn("cannot commit config to store - we'll regenerate it on demand") } } - // update config digest in manifest - manifest.Config.Digest = cfgDgst - manifest.Config.URLs = nil - manifest.Config.Size = int64(len(rawCfg)) - // We might have additional modifications, e.g. adding IPFS URLs to the layers if mh.ManifestModifier != nil { err = mh.ManifestModifier(manifest) @@ -257,45 +259,119 @@ func (mh *manifestHandler) getManifest(w http.ResponseWriter, r *http.Request) { } // DownloadConfig downloads and unmarshales OCIv2 image config, referred to by an OCI descriptor. -func DownloadConfig(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.Descriptor) (cfg *ociv1.Image, err error) { +func DownloadConfig(ctx context.Context, fetch FetcherFunc, ref string, desc ociv1.Descriptor, options ...ManifestDownloadOption) (cfg *ociv1.Image, err error) { if desc.MediaType != images.MediaTypeDockerSchema2Config && desc.MediaType != ociv1.MediaTypeImageConfig { - return nil, xerrors.Errorf("unsupported media type") + return nil, xerrors.Errorf("unsupported media type: %s", desc.MediaType) } - rc, err := fetcher.Fetch(ctx, desc) + var opts manifestDownloadOptions + for _, o := range options { + o(&opts) + } + + var rc io.ReadCloser + if opts.Store != nil { + r, err := opts.Store.ReaderAt(ctx, desc) + if errors.Is(err, errdefs.ErrNotFound) { + // not cached yet + } else if err != nil { + log.WithError(err).WithField("desc", desc).Warn("cannot read config from store - fetching again") + } else { + defer r.Close() + rc = io.NopCloser(content.NewReader(r)) + } + } + if rc == nil { + fetcher, err := fetch() + if err != nil { + return nil, err + } + rc, err = fetcher.Fetch(ctx, desc) + if err != nil { + return nil, xerrors.Errorf("cannot download config: %w", err) + } + defer rc.Close() + } + + buf, err := io.ReadAll(rc) if err != nil { - return nil, xerrors.Errorf("cannot download config: %w", err) + return nil, xerrors.Errorf("cannot read config: %w", err) } - defer rc.Close() var res ociv1.Image - err = json.NewDecoder(rc).Decode(&res) + err = json.Unmarshal(buf, &res) if err != nil { return nil, xerrors.Errorf("cannot decode config: %w", err) } + if opts.Store != nil && ref != "" { + // ref can be empty for some users of DownloadConfig. However, some store implementations + // (e.g. the default containerd store) expect ref to be set. This would lead to stray errors. + + err := func() error { + w, err := opts.Store.Writer(ctx, content.WithDescriptor(desc), content.WithRef(ref)) + if err != nil { + return err + } + n, err := w.Write(buf) + if err != nil { + return err + } + if n != len(buf) { + return io.ErrShortWrite + } + w.Close() + return w.Commit(ctx, int64(len(buf)), digest.FromBytes(buf), content.WithLabels(contentTypeLabel(desc.MediaType))) + }() + if err != nil { + log.WithError(err).WithField("ref", ref).WithField("desc", desc).Warn("cannot cache config") + } + } + return &res, nil } +func contentTypeLabel(mt string) map[string]string { + return map[string]string{"Content-Type": mt} +} + type manifestDownloadOptions struct { - Store content.Store + Store BlobStore } // ManifestDownloadOption alters the default manifest download behaviour type ManifestDownloadOption func(*manifestDownloadOptions) // WithStore caches a downloaded manifest in a store -func WithStore(store content.Store) ManifestDownloadOption { +func WithStore(store BlobStore) ManifestDownloadOption { return func(o *manifestDownloadOptions) { o.Store = store } } +type BlobStore interface { + ReaderAt(ctx context.Context, desc ociv1.Descriptor) (content.ReaderAt, error) + + // Some implementations require WithRef to be included in opts. + Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) + + // Info will return metadata about content available in the content store. + // + // If the content is not present, ErrNotFound will be returned. + Info(ctx context.Context, dgst digest.Digest) (content.Info, error) +} + +type FetcherFunc func() (remotes.Fetcher, error) + +func AsFetcherFunc(f remotes.Fetcher) FetcherFunc { + return func() (remotes.Fetcher, error) { return f, nil } +} + // DownloadManifest downloads and unmarshals the manifest of the given desc. If the desc points to manifest list // we choose the first manifest in that list. -func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.Descriptor, options ...ManifestDownloadOption) (cfg *ociv1.Manifest, rdesc *ociv1.Descriptor, err error) { +func DownloadManifest(ctx context.Context, fetch FetcherFunc, desc ociv1.Descriptor, options ...ManifestDownloadOption) (cfg *ociv1.Manifest, rdesc *ociv1.Descriptor, err error) { var opts manifestDownloadOptions for _, o := range options { o(&opts) @@ -304,8 +380,17 @@ func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.D var ( rc io.ReadCloser placeInStore bool + mediaType string ) if opts.Store != nil { + nfo, err := opts.Store.Info(ctx, desc.Digest) + if errors.Cause(err) == errdefs.ErrNotFound { + // not in store yet + } else if err != nil { + log.WithError(err).WithField("desc", desc).Warn("cannot get manifest from store") + } + mediaType = nfo.Labels["Content-Type"] + r, err := opts.Store.ReaderAt(ctx, desc) if errors.Cause(err) == errdefs.ErrNotFound { // not in store yet @@ -317,11 +402,19 @@ func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.D } if rc == nil { placeInStore = true + + var fetcher remotes.Fetcher + fetcher, err = fetch() + if err != nil { + return + } + rc, err = fetcher.Fetch(ctx, desc) if err != nil { err = xerrors.Errorf("cannot download manifest: %w", err) return } + mediaType = desc.MediaType } inpt, err := io.ReadAll(rc) @@ -332,6 +425,7 @@ func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.D } rdesc = &desc + rdesc.MediaType = mediaType switch rdesc.MediaType { case images.MediaTypeDockerSchema2ManifestList, ociv1.MediaTypeImageIndex: @@ -348,7 +442,13 @@ func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.D return } - // TODO: choose by platform, not just the first manifest + var fetcher remotes.Fetcher + fetcher, err = fetch() + if err != nil { + return + } + + // TODO(cw): choose by platform, not just the first manifest md := list.Manifests[0] rc, err = fetcher.Fetch(ctx, md) if err != nil { @@ -367,7 +467,7 @@ func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.D switch rdesc.MediaType { case images.MediaTypeDockerSchema2Manifest, ociv1.MediaTypeImageManifest: default: - err = xerrors.Errorf("unsupported media type") + err = xerrors.Errorf("unsupported media type: %s", rdesc.MediaType) return } @@ -391,7 +491,7 @@ func DownloadManifest(ctx context.Context, fetcher remotes.Fetcher, desc ociv1.D log.WithError(err).WithField("desc", *rdesc).Warn("cannot store manifest") } - err = w.Commit(ctx, 0, digest.FromBytes(inpt)) + err = w.Commit(ctx, 0, digest.FromBytes(inpt), content.WithLabels(map[string]string{"Content-Type": rdesc.MediaType})) if err != nil { log.WithError(err).WithField("desc", *rdesc).Warn("cannot store manifest") } diff --git a/components/registry-facade/pkg/registry/registry.go b/components/registry-facade/pkg/registry/registry.go index f302bc97d7bd8d..84b6dec46c9b7c 100644 --- a/components/registry-facade/pkg/registry/registry.go +++ b/components/registry-facade/pkg/registry/registry.go @@ -6,6 +6,7 @@ package registry import ( "context" + "encoding/json" "fmt" "io/ioutil" "net" @@ -22,7 +23,6 @@ import ( "github.com/gitpod-io/gitpod/common-go/log" "github.com/gitpod-io/gitpod/registry-facade/api" - "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/remotes" "github.com/docker/distribution" @@ -69,7 +69,7 @@ type ResolverProvider func() remotes.Resolver type Registry struct { Config config.Config Resolver ResolverProvider - Store content.Store + Store BlobStore IPFS *IPFSBlobCache LayerSource LayerSource ConfigModifier ConfigModifier @@ -82,15 +82,34 @@ type Registry struct { // NewRegistry creates a new registry func NewRegistry(cfg config.Config, newResolver ResolverProvider, reg prometheus.Registerer) (*Registry, error) { - storePath := cfg.Store - if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" { - storePath = filepath.Join(tproot, storePath) - } - store, err := local.NewStore(storePath) - if err != nil { - return nil, err + var mfStore BlobStore + if cfg.IPFSCache != nil && cfg.IPFSCache.Enabled { + rdc, err := getRedisClient(cfg.IPFSCache.Redis) + if err != nil { + return nil, xerrors.Errorf("cannot connect to Redis: %w", err) + } + mfStore = &RedisBlobStore{Client: rdc} + log.Info("using redis to cache manifests and config") + + resolverFactory := &RedisCachedResolver{ + Client: rdc, + Provider: newResolver, + } + newResolver = resolverFactory.Factory + log.Info("using redis to cache references") + } else { + storePath := cfg.Store + if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" { + storePath = filepath.Join(tproot, storePath) + } + var err error + mfStore, err = local.NewStore(storePath) + if err != nil { + return nil, err + } + log.WithField("storePath", storePath).Info("using local filesystem to cache manifests and config") + // TODO(cw): GC the store } - // TODO: GC the store ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -182,12 +201,22 @@ func NewRegistry(cfg config.Config, newResolver ResolverProvider, reg prometheus return nil, xerrors.Errorf("cannot read fixed spec: %w", err) } - var fp api.ImageSpec - err = jsonpb.UnmarshalString(string(fc), &fp) + f := make(map[string]json.RawMessage) + err = json.Unmarshal(fc, &f) if err != nil { return nil, xerrors.Errorf("cannot unmarshal fixed spec: %w", err) } - specProvider[api.ProviderPrefixFixed] = FixedImageSpecProvider(fp) + + prov := make(FixedImageSpecProvider) + for k, v := range f { + var spec api.ImageSpec + err = jsonpb.UnmarshalString(string(v), &spec) + if err != nil { + return nil, xerrors.Errorf("cannot unmarshal fixed spec: %w", err) + } + prov[k] = &spec + } + specProvider[api.ProviderPrefixFixed] = prov } var ipfs *IPFSBlobCache @@ -221,7 +250,7 @@ func NewRegistry(cfg config.Config, newResolver ResolverProvider, reg prometheus return &Registry{ Config: cfg, Resolver: newResolver, - Store: store, + Store: mfStore, IPFS: ipfs, SpecProvider: specProvider, LayerSource: layerSource, diff --git a/components/registry-facade/pkg/registry/resolver.go b/components/registry-facade/pkg/registry/resolver.go new file mode 100644 index 00000000000000..84f0b257d775b1 --- /dev/null +++ b/components/registry-facade/pkg/registry/resolver.go @@ -0,0 +1,93 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package registry + +import ( + "context" + "encoding/json" + "time" + + "github.com/containerd/containerd/remotes" + redis "github.com/go-redis/redis/v8" + ociv1 "github.com/opencontainers/image-spec/specs-go/v1" +) + +type RedisCachedResolver struct { + Client *redis.Client + Provider ResolverProvider + + d remotes.Resolver +} + +var ( + _ remotes.Resolver = &RedisCachedResolver{} + _ ResolverProvider = (&RedisCachedResolver{}).Factory +) + +type resolverResult struct { + Name string + Desc ociv1.Descriptor +} + +// Resolve attempts to resolve the reference into a name and descriptor. +// +// The argument `ref` should be a scheme-less URI representing the remote. +// Structurally, it has a host and path. The "host" can be used to directly +// reference a specific host or be matched against a specific handler. +// +// The returned name should be used to identify the referenced entity. +// Dependending on the remote namespace, this may be immutable or mutable. +// While the name may differ from ref, it should itself be a valid ref. +// +// If the resolution fails, an error will be returned. +func (rcr *RedisCachedResolver) Resolve(ctx context.Context, ref string) (name string, desc ociv1.Descriptor, err error) { + raw, _ := rcr.Client.Get(ctx, "resolve."+ref).Result() + if raw != "" { + var res resolverResult + if err := json.Unmarshal([]byte(raw), &res); err == nil { + return res.Name, res.Desc, nil + } + } + + name, desc, err = rcr.resolver().Resolve(ctx, ref) + if err != nil { + return + } + + if raw, err := json.Marshal(resolverResult{Name: name, Desc: desc}); err == nil { + rcr.Client.Set(ctx, "resolve."+ref, string(raw), 2*time.Hour) + } + + return +} + +// Fetcher returns a new fetcher for the provided reference. +// All content fetched from the returned fetcher will be +// from the namespace referred to by ref. +func (rcr *RedisCachedResolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) { + return rcr.resolver().Fetcher(ctx, ref) +} + +// Pusher returns a new pusher for the provided reference +// The returned Pusher should satisfy content.Ingester and concurrent attempts +// to push the same blob using the Ingester API should result in ErrUnavailable. +func (rcr *RedisCachedResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) { + return rcr.resolver().Pusher(ctx, ref) +} + +func (rcr *RedisCachedResolver) resolver() remotes.Resolver { + if rcr.d == nil { + rcr.d = rcr.Provider() + } + return rcr.d +} + +func (rcr *RedisCachedResolver) Factory() remotes.Resolver { + return &RedisCachedResolver{ + Client: rcr.Client, + Provider: rcr.Provider, + d: rcr.Provider(), + } +}