diff --git a/internal/app/machined/pkg/system/services/kubelet.go b/internal/app/machined/pkg/system/services/kubelet.go index dff56dfaf0..aefa4825a9 100644 --- a/internal/app/machined/pkg/system/services/kubelet.go +++ b/internal/app/machined/pkg/system/services/kubelet.go @@ -9,6 +9,7 @@ import ( "fmt" "net/http" "os" + "sync" "time" containerdapi "github.com/containerd/containerd/v2/client" @@ -193,27 +194,7 @@ func (k *Kubelet) Runner(r runtime.Runtime) (runner.Runner, error) { // HealthFunc implements the HealthcheckedService interface. func (k *Kubelet) HealthFunc(runtime.Runtime) health.Check { - return func(ctx context.Context) error { - req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:10248/healthz", nil) - if err != nil { - return err - } - - req = req.WithContext(ctx) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - //nolint:errcheck - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("expected HTTP status OK, got %s", resp.Status) - } - - return nil - } + return func(ctx context.Context) error { return simpleHealthCheck(ctx, "http://127.0.0.1:10248/healthz") } } // HealthSettings implements the HealthcheckedService interface. @@ -247,3 +228,27 @@ func kubeletSeccomp(seccomp *specs.LinuxSeccomp) { }, ) } + +func simpleHealthCheck(ctx context.Context, url string) error { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + + req = req.WithContext(ctx) + + resp, err := http.DefaultClient.Do(req) //nolint:bodyclose + if err != nil { + return err + } + + bodyCloser := sync.OnceValue(resp.Body.Close) + + defer bodyCloser() //nolint:errcheck + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("expected HTTP status OK, got %s", resp.Status) + } + + return bodyCloser() +} diff --git a/internal/app/machined/pkg/system/services/registry/app/main.go b/internal/app/machined/pkg/system/services/registry/app/main.go new file mode 100644 index 0000000000..e10a6c0682 --- /dev/null +++ b/internal/app/machined/pkg/system/services/registry/app/main.go @@ -0,0 +1,50 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "context" + "fmt" + "io/fs" + "os" + "os/signal" + "path/filepath" + + "go.uber.org/zap" + + "github.com/siderolabs/talos/internal/app/machined/pkg/system/services/registry" +) + +func main() { + if err := app(); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func app() error { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + development, err := zap.NewDevelopment() + if err != nil { + return fmt.Errorf("failed to create development logger: %w", err) + } + + homeDir, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("failed to get user home directory: %w", err) + } + + it := func(yield func(fs.StatFS) bool) { + for _, root := range []string{"registry-cache-2", "registry-cache"} { + if !yield(os.DirFS(filepath.Join(homeDir, root)).(fs.StatFS)) { + return + } + } + } + + return registry.NewService(registry.NewMultiPathFS(it), development).Run(ctx) +} diff --git a/internal/app/machined/pkg/system/services/registry/fs.go b/internal/app/machined/pkg/system/services/registry/fs.go new file mode 100644 index 0000000000..e215d6d6b6 --- /dev/null +++ b/internal/app/machined/pkg/system/services/registry/fs.go @@ -0,0 +1,61 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry + +import ( + "errors" + "io/fs" + "iter" + + "github.com/hashicorp/go-multierror" +) + +// MultiPathFS is a FS that can be used to combine multiple FSs into one. +type MultiPathFS struct { + fsIt iter.Seq[fs.StatFS] +} + +// NewMultiPathFS creates a new MultiPathFS. It takes an iterator of FSs which can be used multiple times asynchrously. +func NewMultiPathFS(it iter.Seq[fs.StatFS]) *MultiPathFS { return &MultiPathFS{fsIt: it} } + +// Open opens the named file. +func (m *MultiPathFS) Open(name string) (fs.File, error) { + var multiErr *multierror.Error + + for f := range m.fsIt { + r, err := f.Open(name) + if err == nil { + return r, nil + } + + multiErr = multierror.Append(multiErr, err) + } + + if multiErr == nil { + return nil, errors.New("roots are empty") + } + + return nil, multiErr.ErrorOrNil() +} + +// Stat returns a [fs.FileInfo] describing the named file. +func (m *MultiPathFS) Stat(name string) (fs.FileInfo, error) { + var multiErr *multierror.Error + + for f := range m.fsIt { + r, err := f.Stat(name) + if err == nil { + return r, nil + } + + multiErr = multierror.Append(multiErr, err) + } + + if multiErr == nil { + return nil, errors.New("roots are empty") + } + + return nil, multiErr.ErrorOrNil() +} diff --git a/internal/app/machined/pkg/system/services/registry/params.go b/internal/app/machined/pkg/system/services/registry/params.go new file mode 100644 index 0000000000..5d2c74107a --- /dev/null +++ b/internal/app/machined/pkg/system/services/registry/params.go @@ -0,0 +1,73 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry + +import ( + "net/http" + "path" + "strings" + + "github.com/distribution/reference" + "github.com/siderolabs/gen/xerrors" +) + +func extractParams(req *http.Request) (params, error) { + registry := req.URL.Query().Get("ns") + if registry == "" { + return params{}, xerrors.NewTaggedf[badRequestTag]("missing ns") + } + + value := req.PathValue("args") + + parts := strings.Split(path.Clean(value), "/") + if len(parts) < 4 { + return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect args value '%s'", value) + } + + numParts := len(parts) + isBlob := parts[numParts-2] == "blobs" + isManifest := parts[numParts-2] == "manifests" + + if !isBlob && !isManifest { + return params{}, xerrors.NewTaggedf[notFoundTag]("incorrect ref: '%s'", parts[numParts-2]) + } + + name := strings.Join(parts[:numParts-2], "/") + dig := parts[numParts-1] + + if !reference.NameRegexp.MatchString(name) { + return params{}, xerrors.NewTaggedf[badRequestTag]("incorrect name: '%s'", name) + } + + return params{registry: registry, name: name, dig: dig, isBlob: isBlob}, nil +} + +type params struct { + registry string + name string + dig string + isBlob bool +} + +func (p params) String() string { + var result strings.Builder + + if p.registry != "" { + result.WriteString(p.registry) + result.WriteByte('/') + } + + result.WriteString(p.name) + + if strings.HasPrefix(p.dig, "sha256:") { + result.WriteByte('@') + result.WriteString(p.dig) + } else { + result.WriteByte(':') + result.WriteString(p.dig) + } + + return result.String() +} diff --git a/internal/app/machined/pkg/system/services/registry/readers.go b/internal/app/machined/pkg/system/services/registry/readers.go new file mode 100644 index 0000000000..6c12529331 --- /dev/null +++ b/internal/app/machined/pkg/system/services/registry/readers.go @@ -0,0 +1,137 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry + +import ( + "errors" + "fmt" + "io" + "io/fs" + "os" + + "github.com/containerd/containerd/v2/core/content" + "github.com/containerd/errdefs" +) + +var ( + errInvalidSize = errors.New("readerat: invalid size") + errSeekToInvalidWhence = errors.New("readerat: seek to invalid whence") + errSeekToNegativePosition = errors.New("readerat: seek to negative position") +) + +// readSeeker is an io.ReadSeeker implementation based on an io.ReaderAt (and +// an int64 size). +// +// For example, an os.File is both an io.ReaderAt and an io.ReadSeeker, but its +// io.ReadSeeker methods are not safe to use concurrently. In comparison, +// multiple readerat.readSeeker values (using the same os.File as their +// io.ReaderAt) are safe to use concurrently. Each can Read and Seek +// independently. +// +// A single readerat.readSeeker is not safe to use concurrently. +// +// Do not modify its exported fields after calling any of its methods. +type readSeeker struct { + ReaderAt io.ReaderAt + Size int64 + offset int64 +} + +// Read implements io.Reader. +func (r *readSeeker) Read(p []byte) (int, error) { + if r.Size < 0 { + return 0, errInvalidSize + } else if r.Size <= r.offset { + return 0, io.EOF + } + + if length := r.Size - r.offset; int64(len(p)) > length { + p = p[:length] + } + + if len(p) == 0 { + return 0, nil + } + + actual, err := r.ReaderAt.ReadAt(p, r.offset) + r.offset += int64(actual) + + if err == nil && r.offset == r.Size { + err = io.EOF + } + + return actual, err +} + +// Seek implements io.Seeker. +func (r *readSeeker) Seek(offset int64, whence int) (int64, error) { + if r.Size < 0 { + return 0, errInvalidSize + } + + switch whence { + case io.SeekStart: + // No-op. + case io.SeekCurrent: + offset += r.offset + case io.SeekEnd: + offset += r.Size + default: + return 0, errSeekToInvalidWhence + } + + if offset < 0 { + return 0, errSeekToNegativePosition + } + + r.offset = offset + + return r.offset, nil +} + +// openReaderAt creates ReaderAt from a file. +func openReaderAt(p string, statFS fs.StatFS) (content.ReaderAt, error) { + fi, err := statFS.Stat(p) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + + return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound) + } + + fp, err := statFS.Open(p) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + + return nil, fmt.Errorf("blob not found: %w", errdefs.ErrNotFound) + } + + f, ok := fp.(fsFileReaderAt) + if !ok { + return nil, fmt.Errorf("not a fsFileReaderAt: %T, details: %v", fp, fp) + } + + return sizeReaderAt{size: fi.Size(), fp: f}, nil +} + +// readerat implements io.ReaderAt in a completely stateless manner by opening +// the referenced file for each call to ReadAt. +type sizeReaderAt struct { + size int64 + fp fsFileReaderAt +} + +func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) { return ra.fp.ReadAt(p, offset) } +func (ra sizeReaderAt) Size() int64 { return ra.size } +func (ra sizeReaderAt) Close() error { return ra.fp.Close() } +func (ra sizeReaderAt) Reader() io.Reader { return io.LimitReader(ra.fp, ra.size) } + +type fsFileReaderAt interface { + io.ReaderAt + fs.File +} diff --git a/internal/app/machined/pkg/system/services/registry/registry.go b/internal/app/machined/pkg/system/services/registry/registry.go new file mode 100644 index 0000000000..635129cf96 --- /dev/null +++ b/internal/app/machined/pkg/system/services/registry/registry.go @@ -0,0 +1,271 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package registry provides a simple container registry service. +package registry + +import ( + "bytes" + "cmp" + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/containerd/containerd/v2/core/content" + "github.com/distribution/reference" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/siderolabs/gen/xerrors" + "go.uber.org/zap" +) + +// NewService creates a new instance of the registry service. +func NewService(root fs.StatFS, logger *zap.Logger) *Service { + return &Service{root: root, logger: logger} +} + +// Service is a container registry service. +type Service struct { + logger *zap.Logger + root fs.StatFS +} + +// Run is an entrypoint to the API service. +func (svc *Service) Run(ctx context.Context) error { + mux := http.NewServeMux() + + mux.HandleFunc("GET /v2/{args...}", svc.serveHTTP) + + giveOk := func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + for _, p := range []string{"v2", "healthz"} { + mux.HandleFunc("GET /"+p, giveOk) + mux.HandleFunc("GET /"+p+"/{$}", giveOk) + } + + server := http.Server{Addr: "127.0.0.1:3172", Handler: mux} + errCh := make(chan error, 1) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + context.AfterFunc(ctx, func() { + svc.logger.Info("shutting down registry server", zap.String("addr", server.Addr)) + + shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCtxCancel() + + errCh <- server.Shutdown(shutdownCtx) + }) + + svc.logger.Info("starting registry server", zap.String("addr", server.Addr)) + + err := server.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + err = nil + } + + cancel() + + err = cmp.Or(err, <-errCh) + + svc.logger.Info("registry server stopped", zap.Error(err)) + + return err +} + +func (svc *Service) serveHTTP(w http.ResponseWriter, req *http.Request) { + if err := svc.handler(w, req); err != nil { + svc.logger.Error("failed to handle request", zap.Error(err)) + w.WriteHeader(getStatusCode(err)) + } +} + +func (svc *Service) handler(w http.ResponseWriter, req *http.Request) error { + logger := svc.logger.With( + zap.String("method", req.Method), + zap.String("url", req.URL.String()), + zap.String("remote_addr", req.RemoteAddr), + ) + + p, err := extractParams(req) + if err != nil { + return fmt.Errorf("failed to extract params: %w", err) + } + + logger.Info( + "image request", + zap.String("name", p.name), + zap.String("digest", p.dig), + zap.Bool("is_blob", p.isBlob), + zap.String("registry", p.registry), + ) + + ref, err := svc.resolveCanonicalRef(p) + if err != nil { + return err + } + + var s content.Store + if p.isBlob { + s = &singleFileStore{root: svc.root, path: "blob"} + } else { + s = &singleFileStore{root: svc.root, path: filepath.Join("manifests", ref.Name(), "digest")} + } + + info, err := s.Info(req.Context(), ref.Digest()) + if err != nil { + return err + } + + w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10)) + w.Header().Set("Docker-Content-Digest", ref.Digest().String()) + + if !p.isBlob { + manType, manBlob, err := getManifestData(req.Context(), s, ref) + if err != nil { + return err + } + + w.Header().Set("Content-Type", manType) + + if req.Method == http.MethodHead { + return nil // nothing to do here + } + + http.ServeContent(w, req, ref.Digest().String(), info.UpdatedAt, bytes.NewReader(manBlob)) + + return nil + } + + reader, err := s.ReaderAt(req.Context(), ocispec.Descriptor{Digest: info.Digest}) + if err != nil { + return xerrors.NewTaggedf[internalErrorTag]("failed to get content reader: %w", err) + } + + readerCloser := sync.OnceValue(reader.Close) + + defer readerCloser() //nolint:errcheck + + http.ServeContent(w, req, ref.Digest().String(), info.UpdatedAt, &readSeeker{ReaderAt: reader, Size: info.Size}) + + return readerCloser() +} + +func (svc *Service) resolveCanonicalRef(p params) (reference.Canonical, error) { + ref, err := reference.ParseDockerRef(p.String()) + if err != nil { + return nil, xerrors.NewTaggedf[badRequestTag]("failed to parse docker ref: %w", err) + } + + cRef, ok := ref.(reference.Canonical) + if ok { + return cRef, nil + } + + namedTagged, ok := ref.(reference.NamedTagged) + if !ok { + return nil, xerrors.NewTaggedf[internalErrorTag]("incorrect reference type: %T", ref) + } + + taggedFile := filepath.Join("manifests", namedTagged.Name(), "reference", namedTagged.Tag()) + + ntSum, err := hashFile(taggedFile, svc.root) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return nil, xerrors.NewTaggedf[internalErrorTag]("failed to hash manifest: %w", err) + } + + return nil, xerrors.NewTagged[notFoundTag](err) + } + + sha256file := filepath.Join("manifests", namedTagged.Name(), "digest", digest.NewDigestFromBytes(digest.SHA256, ntSum).String()) + + sSum, err := hashFile(sha256file, svc.root) + if err != nil { + return nil, xerrors.NewTaggedf[internalErrorTag]("failed to hash '%x': %w", sSum, err) + } + + if !bytes.Equal(ntSum, sSum) { + return nil, xerrors.NewTaggedf[internalErrorTag]("hash for '%s' is not equal for hash to '%s'", taggedFile, sha256file) + } + + return &canonical{ + NamedTagged: namedTagged, + digest: digest.NewDigestFromBytes(digest.SHA256, ntSum), + }, nil +} + +func hashFile(f string, where fs.FS) (_ []byte, returnErr error) { + data, err := where.Open(f) + if err != nil { + return nil, err + } + + defer func() { returnErr = cmp.Or(returnErr, data.Close()) }() + + h := sha256.New() + if _, err = io.Copy(h, data); err != nil { + return nil, err + } + + return h.Sum(nil), nil +} + +func getManifestData(ctx context.Context, store content.Store, ref reference.Canonical) (string, []byte, error) { + manifestBlob, err := content.ReadBlob(ctx, store, ocispec.Descriptor{Digest: ref.Digest()}) + if err != nil { + return "", nil, xerrors.NewTaggedf[internalErrorTag]("failed to read content blob: %w", err) + } + + var manifest struct { + MediaType string `json:"mediaType"` + } + + if err = json.Unmarshal(manifestBlob, &manifest); err != nil { + return "", nil, xerrors.NewTaggedf[internalErrorTag]("failed to unmarshal manifest: %w", err) + } + + if manifest.MediaType == "" { + return "", nil, xerrors.NewTaggedf[internalErrorTag]("media type is empty") + } + + return manifest.MediaType, manifestBlob, nil +} + +type canonical struct { + reference.NamedTagged + digest digest.Digest +} + +func (c *canonical) String() string { return c.NamedTagged.String() + "@" + c.digest.Encoded() } +func (c *canonical) Digest() digest.Digest { return c.digest } + +func getStatusCode(err error) int { + switch { + case xerrors.TagIs[notFoundTag](err): + return http.StatusNotFound + case xerrors.TagIs[badRequestTag](err): + return http.StatusBadRequest + case xerrors.TagIs[internalErrorTag](err): + fallthrough + default: + return http.StatusInternalServerError + } +} + +type ( + notFoundTag struct{} + badRequestTag struct{} + internalErrorTag struct{} +) diff --git a/internal/app/machined/pkg/system/services/registry/store.go b/internal/app/machined/pkg/system/services/registry/store.go new file mode 100644 index 0000000000..b1ae47a981 --- /dev/null +++ b/internal/app/machined/pkg/system/services/registry/store.go @@ -0,0 +1,115 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry + +import ( + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "syscall" + "time" + + "github.com/containerd/containerd/v2/core/content" + "github.com/containerd/errdefs" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/siderolabs/gen/xerrors" +) + +type singleFileStore struct { + root fs.StatFS + path string +} + +// Info implements [content.InfoProvider] reading method. +func (s *singleFileStore) Info(_ context.Context, dgst digest.Digest) (content.Info, error) { + p, err := s.blobPath(dgst) + if err != nil { + return content.Info{}, fmt.Errorf("calculating blob info path: %w", err) + } + + fi, err := s.root.Stat(p) + if err != nil { + if os.IsNotExist(err) || errors.Is(err, errdefs.ErrNotFound) { + return content.Info{}, xerrors.NewTaggedf[notFoundTag]("content '%s': %w", dgst, errdefs.ErrNotFound) + } + + return content.Info{}, xerrors.NewTagged[internalErrorTag](err) + } + + return content.Info{ + Digest: dgst, + Size: fi.Size(), + CreatedAt: fi.ModTime(), + UpdatedAt: getATime(fi), + }, nil +} + +// ReaderAt implements [content.Provider] reading method. +func (s *singleFileStore) ReaderAt(_ context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { + p, err := s.blobPath(desc.Digest) + if err != nil { + return nil, fmt.Errorf("calculating blob path for ReaderAt: %w", err) + } + + reader, err := openReaderAt(p, s.root) + if err != nil { + return nil, fmt.Errorf("blob '%s' expected at '%s': %w", desc.Digest, p, err) + } + + return reader, nil +} + +// Status implements [content.IngestManager] ingesting which we don't need. +func (s *singleFileStore) Status(context.Context, string) (content.Status, error) { + return content.Status{}, errUnimplemented +} + +// ListStatuses implements [content.IngestManager] ingesting which we don't need. +func (s *singleFileStore) ListStatuses(context.Context, ...string) ([]content.Status, error) { + return nil, errUnimplemented +} + +// Abort implements [content.IngestManager] ingesting which we don't need. +func (s *singleFileStore) Abort(context.Context, string) error { return errUnimplemented } + +// Writer implements [content.Ingester] ingesting which we don't need. +func (s *singleFileStore) Writer(context.Context, ...content.WriterOpt) (content.Writer, error) { + return nil, errUnimplemented +} + +// Walk implements [content.Manager] ingesting which we don't need. +func (s *singleFileStore) Walk(context.Context, content.WalkFunc, ...string) error { + return errUnimplemented +} + +// Delete implements [content.Manager] ingesting which we don't need. +func (s *singleFileStore) Delete(context.Context, digest.Digest) error { return errUnimplemented } + +// Update implements [content.Manager] ingesting which we don't need. +func (s *singleFileStore) Update(context.Context, content.Info, ...string) (content.Info, error) { + return content.Info{}, errUnimplemented +} + +func (s *singleFileStore) blobPath(dgst digest.Digest) (string, error) { + if err := dgst.Validate(); err != nil { + return "", fmt.Errorf("cannot calculate blob path from invalid digest: %v: %w", err, errdefs.ErrInvalidArgument) + } + + return filepath.Join(s.path, dgst.String()), nil +} + +var errUnimplemented = errors.New("unimplemented") + +func getATime(fi os.FileInfo) time.Time { + if st, ok := fi.Sys().(*syscall.Stat_t); ok { + return time.Unix(st.Atim.Unix()) + } + + return fi.ModTime() +} diff --git a/internal/app/machined/pkg/system/services/registryd.go b/internal/app/machined/pkg/system/services/registryd.go new file mode 100644 index 0000000000..ca1e0ca43a --- /dev/null +++ b/internal/app/machined/pkg/system/services/registryd.go @@ -0,0 +1,61 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package services + +import ( + "context" + "io" + "io/fs" + "os" + + "go.uber.org/zap/zapcore" + + "github.com/siderolabs/talos/internal/app/machined/pkg/runtime" + "github.com/siderolabs/talos/internal/app/machined/pkg/system" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/events" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/health" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/runner" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/runner/goroutine" + "github.com/siderolabs/talos/internal/app/machined/pkg/system/services/registry" + "github.com/siderolabs/talos/pkg/conditions" + "github.com/siderolabs/talos/pkg/logging" +) + +type registryD struct{} + +// NewRegistryD returns a new docker mirror registry service. +func NewRegistryD() system.Service { return ®istryD{} } +func (r *registryD) ID(runtime.Runtime) string { return "registryd" } +func (r *registryD) HealthSettings(runtime.Runtime) *health.Settings { return &health.DefaultSettings } +func (r *registryD) PreFunc(context.Context, runtime.Runtime) error { return nil } +func (r *registryD) PostFunc(runtime.Runtime, events.ServiceState) error { return nil } +func (r *registryD) Condition(runtime.Runtime) conditions.Condition { return nil } +func (r *registryD) DependsOn(runtime.Runtime) []string { return nil } + +func (r *registryD) HealthFunc(runtime.Runtime) health.Check { + return func(ctx context.Context) error { return simpleHealthCheck(ctx, "http://127.0.0.1:3172/healthz") } +} + +func (r *registryD) Runner(rt runtime.Runtime) (runner.Runner, error) { + it := func(yield func(fs.StatFS) bool) { + // TODO: Replace the code below with reads from `runtime.Runtime`. + for _, root := range []string{"/imagecache", "/var/lib/registry-cache"} { + if !yield(os.DirFS(root).(fs.StatFS)) { + return + } + } + } + + return goroutine.NewRunner(rt, "registryd", func(ctx context.Context, r runtime.Runtime, logOutput io.Writer) error { + return registry.NewService( + registry.NewMultiPathFS(it), + logging.ZapLogger(logging.NewLogDestination( + logOutput, + zapcore.DebugLevel, + logging.WithColoredLevels(), + )), + ).Run(ctx) + }, runner.WithLoggingManager(rt.Logging())), nil +} diff --git a/pkg/imager/cache/cache.go b/pkg/imager/cache/cache.go index d250dd2014..df2c12ed22 100644 --- a/pkg/imager/cache/cache.go +++ b/pkg/imager/cache/cache.go @@ -136,12 +136,13 @@ func Generate(images []string, platform string, insecure bool, imageLayerCachePa return fmt.Errorf("fetching image %q: %w", src, err) } - filename := tag.TagStr() - if filename == "" { - filename = rmt.Digest.String() + if tag.TagStr() != "" { + if err := os.WriteFile(filepath.Join(referenceDir, tag.TagStr()), manifest, 0o644); err != nil { + return err + } } - if err := os.WriteFile(filepath.Join(digestDir, filename), manifest, 0o644); err != nil { + if err := os.WriteFile(filepath.Join(digestDir, rmt.Digest.String()), manifest, 0o644); err != nil { return err }