Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: registry proxy #9722

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions internal/app/machined/pkg/system/services/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"time"

containerdapi "github.com/containerd/containerd/v2/client"
Expand Down Expand Up @@ -193,27 +194,7 @@ func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) {

// HealthFunc implements the HealthcheckedService interface.
func (k *Kubelet) HealthFunc(runtime.Runtime) health.Check {
return func(ctx context.Context) error {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:10248/healthz", nil)
if err != nil {
return err
}

req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
//nolint:errcheck
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("expected HTTP status OK, got %s", resp.Status)
}

return nil
}
return func(ctx context.Context) error { return simpleHealthCheck(ctx, "http://127.0.0.1:10248/healthz") }
}

// HealthSettings implements the HealthcheckedService interface.
Expand Down Expand Up @@ -247,3 +228,27 @@ func kubeletSeccomp(seccomp *specs.LinuxSeccomp) {
},
)
}

func simpleHealthCheck(ctx context.Context, url string) error {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}

req = req.WithContext(ctx)

resp, err := http.DefaultClient.Do(req) //nolint:bodyclose
if err != nil {
return err
}

bodyCloser := sync.OnceValue(resp.Body.Close)

defer bodyCloser() //nolint:errcheck

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("expected HTTP status OK, got %s", resp.Status)
}

return bodyCloser()
}
50 changes: 50 additions & 0 deletions internal/app/machined/pkg/system/services/registry/app/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"context"
"fmt"
"io/fs"
"os"
"os/signal"
"path/filepath"

"go.uber.org/zap"

"github.com/siderolabs/talos/internal/app/machined/pkg/system/services/registry"
)

func main() {
if err := app(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func app() error {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

development, err := zap.NewDevelopment()
if err != nil {
return fmt.Errorf("failed to create development logger: %w", err)
}

homeDir, err := os.UserHomeDir()
DmitriyMV marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to get user home directory: %w", err)
}

it := func(yield func(fs.StatFS) bool) {
for _, root := range []string{"registry-cache-2", "registry-cache"} {
if !yield(os.DirFS(filepath.Join(homeDir, root)).(fs.StatFS)) {
return
}
}
}

return registry.NewService(registry.NewMultiPathFS(it), development).Run(ctx)
}
61 changes: 61 additions & 0 deletions internal/app/machined/pkg/system/services/registry/fs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"errors"
"io/fs"
"iter"

"github.com/hashicorp/go-multierror"
)

// MultiPathFS is a FS that can be used to combine multiple FSs into one.
type MultiPathFS struct {
fsIt iter.Seq[fs.StatFS]
}

// NewMultiPathFS creates a new MultiPathFS. It takes an iterator of FSs which can be used multiple times asynchrously.
func NewMultiPathFS(it iter.Seq[fs.StatFS]) *MultiPathFS { return &MultiPathFS{fsIt: it} }

// Open opens the named file.
func (m *MultiPathFS) Open(name string) (fs.File, error) {
var multiErr *multierror.Error

for f := range m.fsIt {
r, err := f.Open(name)
if err == nil {
return r, nil
}

multiErr = multierror.Append(multiErr, err)
}

if multiErr == nil {
return nil, errors.New("roots are empty")
}

return nil, multiErr.ErrorOrNil()
}

// Stat returns a [fs.FileInfo] describing the named file.
func (m *MultiPathFS) Stat(name string) (fs.FileInfo, error) {
var multiErr *multierror.Error

for f := range m.fsIt {
r, err := f.Stat(name)
if err == nil {
return r, nil
}

multiErr = multierror.Append(multiErr, err)
}

if multiErr == nil {
return nil, errors.New("roots are empty")
}

return nil, multiErr.ErrorOrNil()
}
73 changes: 73 additions & 0 deletions internal/app/machined/pkg/system/services/registry/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"net/http"
"path"
"strings"

"github.com/distribution/reference"
"github.com/siderolabs/gen/xerrors"
)

func extractParams(req *http.Request) (params, error) {
registry := req.URL.Query().Get("ns")
if registry == "" {
return params{}, xerrors.NewTaggedf[badRequestTag]("missing ns")
}

value := req.PathValue("args")

parts := strings.Split(path.Clean(value), "/")
if len(parts) < 4 {
return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect args value '%s'", value)
}

numParts := len(parts)
isBlob := parts[numParts-2] == "blobs"
isManifest := parts[numParts-2] == "manifests"

if !isBlob && !isManifest {
return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect ref: '%s'", parts[numParts-2])
}

name := strings.Join(parts[:numParts-2], "/")
dig := parts[numParts-1]

if !reference.NameRegexp.MatchString(name) {
return params{}, xerrors.NewTaggedf[badRequestTag]("incorrect name: '%s'", name)
}

return params{registry: registry, name: name, dig: dig, isBlob: isBlob}, nil
}

type params struct {
registry string
name string
dig string
isBlob bool
}

func (p params) String() string {
var result strings.Builder

if p.registry != "" {
result.WriteString(p.registry)
result.WriteByte('/')
}

result.WriteString(p.name)

if strings.HasPrefix(p.dig, "sha256:") {
result.WriteByte('@')
result.WriteString(p.dig)
} else {
result.WriteByte(':')
result.WriteString(p.dig)
}

return result.String()
}
137 changes: 137 additions & 0 deletions internal/app/machined/pkg/system/services/registry/readers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package registry

import (
"errors"
"fmt"
"io"
"io/fs"
"os"

"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/errdefs"
)

var (
errInvalidSize = errors.New("readerat: invalid size")
errSeekToInvalidWhence = errors.New("readerat: seek to invalid whence")
errSeekToNegativePosition = errors.New("readerat: seek to negative position")
)

// readSeeker is an io.ReadSeeker implementation based on an io.ReaderAt (and
// an int64 size).
//
// For example, an os.File is both an io.ReaderAt and an io.ReadSeeker, but its
// io.ReadSeeker methods are not safe to use concurrently. In comparison,
// multiple readerat.readSeeker values (using the same os.File as their
// io.ReaderAt) are safe to use concurrently. Each can Read and Seek
// independently.
//
// A single readerat.readSeeker is not safe to use concurrently.
//
// Do not modify its exported fields after calling any of its methods.
type readSeeker struct {
ReaderAt io.ReaderAt
Size int64
offset int64
}

// Read implements io.Reader.
func (r *readSeeker) Read(p []byte) (int, error) {
if r.Size < 0 {
return 0, errInvalidSize
} else if r.Size <= r.offset {
return 0, io.EOF
}

if length := r.Size - r.offset; int64(len(p)) > length {
p = p[:length]
}

if len(p) == 0 {
return 0, nil
}

actual, err := r.ReaderAt.ReadAt(p, r.offset)
r.offset += int64(actual)

if err == nil && r.offset == r.Size {
err = io.EOF
}

return actual, err
}

// Seek implements io.Seeker.
func (r *readSeeker) Seek(offset int64, whence int) (int64, error) {
if r.Size < 0 {
return 0, errInvalidSize
}

switch whence {
case io.SeekStart:
// No-op.
case io.SeekCurrent:
offset += r.offset
case io.SeekEnd:
offset += r.Size
default:
return 0, errSeekToInvalidWhence
}

if offset < 0 {
return 0, errSeekToNegativePosition
}

r.offset = offset

return r.offset, nil
}

// openReaderAt creates ReaderAt from a file.
func openReaderAt(p string, statFS fs.StatFS) (content.ReaderAt, error) {
fi, err := statFS.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}

fp, err := statFS.Open(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound)
}

f, ok := fp.(fsFileReaderAt)
if !ok {
return nil, fmt.Errorf("not a fsFileReaderAt: %T, details: %v", fp, fp)
}

return sizeReaderAt{size: fi.Size(), fp: f}, nil
}

// readerat implements io.ReaderAt in a completely stateless manner by opening
// the referenced file for each call to ReadAt.
type sizeReaderAt struct {
size int64
fp fsFileReaderAt
}

func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) { return ra.fp.ReadAt(p, offset) }
func (ra sizeReaderAt) Size() int64 { return ra.size }
func (ra sizeReaderAt) Close() error { return ra.fp.Close() }
func (ra sizeReaderAt) Reader() io.Reader { return io.LimitReader(ra.fp, ra.size) }

type fsFileReaderAt interface {
io.ReaderAt
fs.File
}
Loading