Skip to content

Commit

Permalink
refactor: mirror-resources
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Laine <[email protected]>
  • Loading branch information
phillebaba committed Sep 6, 2024
1 parent 93128e8 commit 8dde91e
Showing 1 changed file with 285 additions and 0 deletions.
285 changes: 285 additions & 0 deletions src/pkg/packager2/packager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package packager2

import (
"context"
"fmt"
"net/http"
"net/url"
"regexp"
"strconv"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
v1 "github.com/google/go-containerregistry/pkg/v1"

"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/git"
"github.com/zarf-dev/zarf/src/internal/gitea"
"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/layout"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/pkg/transform"
"github.com/zarf-dev/zarf/src/pkg/utils"
"github.com/zarf-dev/zarf/src/types"
)

func Mirror(ctx context.Context, c *cluster.Cluster, pkgPaths layout.PackagePaths, regInfo types.RegistryInfo, gitInfo types.GitServerInfo, noImgChecksum bool, retries int) error {
err := pushImagesToRegistry(ctx, c, pkgPaths, regInfo, noImgChecksum, retries)
if err != nil {
return err
}
err = pushReposToRepository(ctx, c, pkgPaths, gitInfo, retries)
if err != nil {
return err
}
return nil
}

func pushImagesToRegistry(ctx context.Context, c *cluster.Cluster, pkgPaths layout.PackagePaths, regInfo types.RegistryInfo, noImgChecksum bool, retries int) error {
logs.Warn.SetOutput(&message.DebugWriter{})
logs.Progress.SetOutput(&message.DebugWriter{})

pkg, _, err := pkgPaths.ReadZarfYAML()
if err != nil {
return err
}
var totalSize int64
images := map[transform.Image]v1.Image{}
for _, component := range pkg.Components {
for _, img := range component.Images {
ref, err := transform.ParseImageRef(img)
if err != nil {
return fmt.Errorf("failed to create ref for image %s: %w", img, err)
}
if _, ok := images[ref]; ok {
continue
}
ociImage, err := utils.LoadOCIImage(pkgPaths.Images.Base, ref)
if err != nil {
return err
}
images[ref] = ociImage
imgSize, err := calcImgSize(ociImage)
if err != nil {
return err
}
totalSize += imgSize
}
}
if len(images) == 0 {
return nil
}

// If this is not a no checksum image push we will be pushing two images (the second will go faster as it checks the same layers)
if !noImgChecksum {
totalSize = totalSize * 2
}

err = retry.Do(func() error {
var tunnel *cluster.Tunnel
var registryURL = regInfo.Address
if c != nil {
registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(ctx, regInfo)
if err != nil {
return err
}
if tunnel != nil {
defer tunnel.Close()
}
}

progress := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(images)))
defer progress.Close()

transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig.InsecureSkipVerify = config.CommonOptions.Insecure
// TODO (@WSTARR) This is set to match the TLSHandshakeTimeout to potentially mitigate effects of https://github.com/zarf-dev/zarf/issues/1444
transport.ResponseHeaderTimeout = 10 * time.Second
transportWithProgressBar := helpers.NewTransport(transport, progress)

pushOptions := []crane.Option{
crane.WithPlatform(&v1.Platform{OS: "linux", Architecture: pkg.Build.Architecture}),
crane.WithTransport(transportWithProgressBar),
crane.WithAuth(authn.FromConfig(authn.AuthConfig{
Username: regInfo.PushUsername,
Password: regInfo.PushPassword,
})),
crane.WithUserAgent("zarf"),
crane.WithNoClobber(true),
crane.WithJobs(1),
}
if config.CommonOptions.Insecure {
pushOptions = append(pushOptions, crane.Insecure)
}

pushImage := func(img v1.Image, name string) error {
if tunnel != nil {
return tunnel.Wrap(func() error { return crane.Push(img, name, pushOptions...) })
}

return crane.Push(img, name, pushOptions...)
}

pushed := []transform.Image{}
defer func() {
for _, refInfo := range pushed {
delete(images, refInfo)
}
}()
for refInfo, img := range images {
progress.Updatef(fmt.Sprintf("Pushing %s", helpers.Truncate(refInfo.Reference, 55, true)))

size, err := calcImgSize(img)
if err != nil {
return err
}

// If this is not a no checksum image push it for use with the Zarf agent
if !noImgChecksum {
offlineNameCRC, err := transform.ImageTransformHost(registryURL, refInfo.Reference)
if err != nil {
return err
}
if err = pushImage(img, offlineNameCRC); err != nil {
return err
}
totalSize -= size
}

// To allow for other non-zarf workloads to easily see the images upload a non-checksum version
// (this may result in collisions but this is acceptable for this use case)
offlineName, err := transform.ImageTransformHostWithoutChecksum(registryURL, refInfo.Reference)
if err != nil {
return err
}

if err = pushImage(img, offlineName); err != nil {
return err
}

pushed = append(pushed, refInfo)
totalSize -= size
}
progress.Successf("Pushed %d images", len(images))
return nil
}, retry.Context(ctx), retry.Attempts(uint(retries)), retry.Delay(500*time.Millisecond))
if err != nil {
return err
}
return nil
}

func calcImgSize(img v1.Image) (int64, error) {
size, err := img.Size()
if err != nil {
return size, err
}
layers, err := img.Layers()
if err != nil {
return size, err
}
for _, layer := range layers {
ls, err := layer.Size()
if err != nil {
return size, err
}
size += ls
}
return size, nil
}

func pushReposToRepository(ctx context.Context, c *cluster.Cluster, pkgPaths layout.PackagePaths, gitInfo types.GitServerInfo, retries int) error {
pkg, _, err := pkgPaths.ReadZarfYAML()
if err != nil {
return err
}
for _, component := range pkg.Components {
for _, repoURL := range component.Repos {
repository, err := git.Open(pkgPaths.Components.Dirs[component.Name].Repos, repoURL)
if err != nil {
return err
}
err = retry.Do(func() error {
namespace, name, port, err := serviceInfoFromServiceURL(gitInfo.Address)

// If this is a service (svcInfo is not nil), create a port-forward tunnel to that resource
// TODO: Find a better way as ignoring the error is not a good solution to decide to port forward.
if err == nil {
// TODO: This means we require a cluster and should enforce it.
tunnel, err := c.NewTunnel(namespace, cluster.SvcResource, name, "", 0, port)
if err != nil {
return err
}
_, err = tunnel.Connect(ctx)
if err != nil {
return err
}
defer tunnel.Close()
giteaClient, err := gitea.NewClient(tunnel.HTTPEndpoint(), gitInfo.PushUsername, gitInfo.PushPassword)
if err != nil {
return err
}
return tunnel.Wrap(func() error {
err = repository.Push(ctx, tunnel.HTTPEndpoint(), gitInfo.PushUsername, gitInfo.PushPassword)
if err != nil {
return err
}
// Add the read-only user to this repo
repoName, err := transform.GitURLtoRepoName(repoURL)
if err != nil {
return err
}
err = giteaClient.AddReadOnlyUserToRepository(ctx, repoName, gitInfo.PullUsername)
if err != nil {
return fmt.Errorf("unable to add the read only user to the repo %s: %w", repoName, err)
}
return nil
})
}

err = repository.Push(ctx, gitInfo.Address, gitInfo.PushUsername, gitInfo.PushPassword)
if err != nil {
return err
}
return nil
}, retry.Context(ctx), retry.Attempts(uint(retries)), retry.Delay(500*time.Millisecond))
if err != nil {
return fmt.Errorf("unable to push repo %s to the Git Server: %w", repoURL, err)
}
}
}
return nil
}

var (
// localClusterServiceRegex is used to match the local cluster service format:
localClusterServiceRegex = regexp.MustCompile(`^(?P<name>[^\.]+)\.(?P<namespace>[^\.]+)\.svc\.cluster\.local$`)
)

// ServiceInfoFromServiceURL takes a serviceURL and parses it to find the service info for connecting to the cluster. The string is expected to follow the following format:
// Example serviceURL: http://{SERVICE_NAME}.{NAMESPACE}.svc.cluster.local:{PORT}.
func serviceInfoFromServiceURL(serviceURL string) (string, string, int, error) {
parsedURL, err := url.Parse(serviceURL)
if err != nil {
return "", "", 0, err
}

// Get the remote port from the serviceURL.
remotePort, err := strconv.Atoi(parsedURL.Port())
if err != nil {
return "", "", 0, err
}

// Match hostname against local cluster service format.
get, err := helpers.MatchRegex(localClusterServiceRegex, parsedURL.Hostname())

// If incomplete match, return an error.
if err != nil {
return "", "", 0, err
}
return get("namespace"), get("name"), remotePort, nil
}

0 comments on commit 8dde91e

Please sign in to comment.