Skip to content

Commit

Permalink
feat: registry proxy
Browse files Browse the repository at this point in the history
WIP on container registry proxy

Signed-off-by: Dmitriy Matrenichev <[email protected]>
  • Loading branch information
DmitriyMV committed Nov 14, 2024
1 parent e26d004 commit 4bee044
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 4 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ linters:
- gochecknoinits
- gocognit
- godox
- gomnd
- gosec
- ireturn # we return interfaces
- maintidx
Expand Down
2 changes: 0 additions & 2 deletions hack/cri-plugin.part
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
version = 3

[plugins."io.containerd.cri.v1.images"]
discard_unpacked_layers = true
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func StartAllServices(runtime.Sequence, any) (runtime.TaskExecutionFunc, string)

serviceList := []system.Service{
&services.CRI{},
&services.Registryd{},
}

switch t := r.Config().Machine().Type(); t {
Expand Down
41 changes: 41 additions & 0 deletions internal/app/machined/pkg/system/services/registry/app/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"

"go.uber.org/zap"

"github.com/siderolabs/talos/internal/app/machined/pkg/system/services/registry"
)

func main() {
if err := app(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func app() error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

development, err := zap.NewDevelopment()
if err != nil {
return fmt.Errorf("failed to create development logger: %w", err)
}

homeDir, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to get user home directory: %w", err)
}

return registry.NewService(filepath.Join(homeDir, "registry-cache"), development).Run(ctx)
}
30 changes: 30 additions & 0 deletions internal/app/machined/pkg/system/services/registry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"net/http"

"github.com/siderolabs/gen/xerrors"
)

func getStatusCode(err error) int {
switch {
case xerrors.TagIs[notFoundTag](err):
return http.StatusNotFound
case xerrors.TagIs[badRequestTag](err):
return http.StatusBadRequest
case xerrors.TagIs[internalErrorTag](err):
fallthrough
default:
return http.StatusInternalServerError
}
}

type (
notFoundTag struct{}
badRequestTag struct{}
internalErrorTag struct{}
)
92 changes: 92 additions & 0 deletions internal/app/machined/pkg/system/services/registry/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"fmt"
"net/http"
"net/url"
"path"
"strings"

"github.com/distribution/reference"
"github.com/siderolabs/gen/xerrors"
)

func extractParams(req *http.Request) (params, error) {
value := req.PathValue("args")

parts := strings.Split(path.Clean(value), "/")
if len(parts) < 4 {
return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect args value '%s'", value)
}

numParts := len(parts)
if p := parts[numParts-2]; p != "blobs" && p != "manifests" {
return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect ref: '%s'", p)
}

name := strings.Join(parts[:numParts-2], "/")
isBlob := parts[numParts-2] == "blobs"
dig := parts[numParts-1]

if !reference.NameRegexp.MatchString(name) {
return params{}, xerrors.NewTaggedf[badRequestTag]("incorrect name: '%s'", name)
}

res, err := makeParams(req.URL.Query().Get("ns"), name, dig, isBlob)
if err != nil {
return params{}, xerrors.NewTaggedf[badRequestTag]("failed to make params: %w", err)
}

return res, nil
}

func makeParams(registry string, name string, dig string, isBlob bool) (params, error) {
if registry != "" {
return params{registry: registry, name: name, dig: dig, isBlob: isBlob}, nil
}

u, err := url.Parse("dummy://" + name)

switch {
case err != nil:
return params{}, fmt.Errorf("invalid url: %w", err)
case u.Scheme != "dummy":
return params{}, fmt.Errorf("incorrect scheme")
case u.Host == "":
return params{}, fmt.Errorf("hostname required")
default:
return params{registry: "", name: name, dig: dig, isBlob: isBlob}, nil
}
}

type params struct {
registry string
name string
dig string
isBlob bool
}

func (p params) String() string {
var result strings.Builder

if p.registry != "" {
result.WriteString(p.registry)
result.WriteByte('/')
}

result.WriteString(p.name)

if strings.HasPrefix(p.dig, "sha256:") {
result.WriteByte('@')
result.WriteString(p.dig)
} else {
result.WriteByte(':')
result.WriteString(p.dig)
}

return result.String()
}
196 changes: 196 additions & 0 deletions internal/app/machined/pkg/system/services/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package registry provides a simple container registry service.
package registry

import (
"bytes"
"cmp"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"time"

"github.com/distribution/reference"
"github.com/opencontainers/go-digest"
"github.com/siderolabs/gen/xerrors"
"go.uber.org/zap"
)

// NewService creates a new instance of the registry service.
func NewService(root string, logger *zap.Logger) *Service {
return &Service{root: root, logger: logger}
}

// Service is a container registry service.
type Service struct {
logger *zap.Logger
root string
}

// Run is an entrypoint to the API service.
func (s *Service) Run(ctx context.Context) error {
mux := http.NewServeMux()

mux.HandleFunc("GET /v2/{args...}", s.serveHTTP)

giveOk := func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }
for _, p := range []string{"v2", "healthz"} {
mux.HandleFunc("GET /"+p, giveOk)
mux.HandleFunc("GET /"+p+"/{$}", giveOk)
}

server := http.Server{Addr: ":3172", Handler: mux}
errCh := make(chan error, 1)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

context.AfterFunc(ctx, func() {
shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCtxCancel()

errCh <- server.Shutdown(shutdownCtx)
})

err := server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
err = nil
}

cancel()

return cmp.Or(err, <-errCh)
}

func (s *Service) serveHTTP(w http.ResponseWriter, req *http.Request) {
if err := s.handler(w, req); err != nil {
s.logger.Error("failed to handle request", zap.Error(err))
w.WriteHeader(getStatusCode(err))
}
}

func (s *Service) handler(w http.ResponseWriter, req *http.Request) error {
isProxied := req.Header.Get("X-Talos-Registry-Proxy") == "true"

logger := s.logger.With(
zap.String("method", req.Method),
zap.String("url", req.URL.String()),
zap.Bool("proxied", isProxied),
zap.String("remote_addr", req.RemoteAddr),
)

p, err := extractParams(req)
if err != nil {
return fmt.Errorf("failed to extract params: %w", err)
}

logger.Info(
"image request",
zap.String("name", p.name),
zap.String("digest", p.dig),
zap.Bool("is_blob", p.isBlob),
zap.String("registry", p.registry),
)

ref, err := reference.ParseDockerRef(p.String())
if err != nil {
return xerrors.NewTaggedf[badRequestTag]("failed to parse docker ref: %w", err)
}

canonicalRef, ok := ref.(reference.Canonical)
if !ok {
if canonicalRef, err = s.resolveCanonicalRef(ref); err != nil {
return err
}
}

if !p.isBlob {
w.Header().Add("Content-Type", "application/vnd.oci.image.index.v1+json")
w.Header().Add("Docker-Content-Digest", canonicalRef.Digest().String())

http.ServeFile(w, req, filepath.Join(
s.root,
"manifests",
canonicalRef.Name(),
"digest",
canonicalRef.Digest().String(),
))
} else {
// w.Header().Add("Content-Type", "application/vnd.oci.image.manifest.v1+json")
// w.Header().Add("Docker-Content-Digest", canonicalRef.Digest().String())
http.ServeFile(w, req, filepath.Join(
s.root,
"blobs",
canonicalRef.Digest().String(),
))
}

return nil
}

func (s *Service) resolveCanonicalRef(ref reference.Reference) (reference.Canonical, error) {
namedTagged, ok := ref.(reference.NamedTagged)
if !ok {
return nil, xerrors.NewTaggedf[internalErrorTag]("incorrect reference type: %T", ref)
}

taggedFile := filepath.Join(s.root, "manifests", namedTagged.Name(), "reference", namedTagged.Tag())

ntSum, err := hashFile(taggedFile)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return nil, xerrors.NewTaggedf[internalErrorTag]("failed to hash manifest: %w", err)
}

return nil, xerrors.NewTagged[notFoundTag](err)
}

sha256file := filepath.Join(s.root, "manifests", namedTagged.Name(), "digest", "sha256:"+hex.EncodeToString(ntSum))

sSum, err := hashFile(sha256file)
if err != nil {
return nil, xerrors.NewTaggedf[internalErrorTag]("failed to hash '%x': %w", sSum, err)
}

if !bytes.Equal(ntSum, sSum) {
return nil, xerrors.NewTaggedf[internalErrorTag]("hash for '%s' is not equal for hash to '%s'", taggedFile, sha256file)
}

return &canonical{
NamedTagged: namedTagged,
digest: "sha256:" + hex.EncodeToString(ntSum),
}, nil
}

func hashFile(f string) (_ []byte, returnErr error) {
data, err := os.Open(f)
if err != nil {
return nil, err
}

defer func() { returnErr = cmp.Or(returnErr, data.Close()) }()

h := sha256.New()
if _, err = io.Copy(h, data); err != nil {
return nil, err
}

return h.Sum(nil), nil
}

type canonical struct {
reference.NamedTagged
digest string
}

func (c *canonical) String() string { return c.NamedTagged.String() + "@" + c.digest }
func (c *canonical) Digest() digest.Digest { return digest.Digest(c.digest) }
Loading

0 comments on commit 4bee044

Please sign in to comment.