Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
[unpack] Add support for unpacking from Image based sources
Browse files Browse the repository at this point in the history
Signed-off-by: Varsha Prasad Narsing <[email protected]>
  • Loading branch information
varshaprasad96 committed Sep 11, 2023
1 parent 87a15d1 commit 0256166
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 94 deletions.
3 changes: 2 additions & 1 deletion cmd/core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func main() {
os.Exit(1)
}

defaultUnpacker, err := v1alpha2source.NewDefaultUnpackerWithOpts(systemNsCluster, systemNamespace)
// TODO(Varsha): Revert unpack image to default val. This was just for checking
defaultUnpacker, err := v1alpha2source.NewDefaultUnpackerWithOpts(systemNsCluster, systemNamespace, v1alpha2source.WithUnpackImage("quay.io/operator-framework/rukpak:main"))
// unpacker, err := source.NewDefaultUnpacker(systemNsCluster, systemNamespace, unpackImage, baseUploadManagerURL, rootCAs)
if err != nil {
setupLog.Error(err, "unable to setup bundle unpacker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/google/go-cmp/cmp"
"github.com/operator-framework/rukpak/api/v1alpha2"

v1alpha2deployer "github.com/operator-framework/rukpak/internal/controllers/v1alpha2/deployer"
Expand Down Expand Up @@ -123,7 +123,6 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
// The controller is not updating spec, we only update the status. Hence sending
// a status update should be enough.
if !equality.Semantic.DeepEqual(existingBD.Status, reconciledBD.Status) {
fmt.Println("before update", cmp.Diff(existingBD, reconciledBD))
if updateErr := b.Status().Update(ctx, reconciledBD); updateErr != nil {
return res, apimacherrors.NewAggregate([]error{reconcileErr, updateErr})
}
Expand All @@ -134,21 +133,24 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2.BundleDeployment) (ctrl.Result, error) {
// Unpack contents from the bundle deployment for each of the specified source and update
// the status of the object.
bundleDepFS, state, err := b.unpackContents(ctx, bd)
switch state {
// TODO: In case of unpack pending and request is being requeued again indefinitely.
bundleDepFS, res, err := b.unpackContents(ctx, bd)
switch res.State {
case v1alpha2source.StateUnpackPending:
setUnpackStatusPending(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, nil
setUnpackStatusPending(&bd.Status.Conditions, fmt.Sprintf("pending unpack pod"), bd.Generation)
// Requeing after 5 sec for now since the average time to unpack an registry bundle locally
// was around ~4sec.
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
case v1alpha2source.StateUnpacking:
setUnpackStatusPending(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, nil
setUnpackStatusPending(&bd.Status.Conditions, fmt.Sprintf("unpacking pod"), bd.Generation)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
case v1alpha2source.StateUnpackFailed:
setUnpackStatusFailing(&bd.Status.Conditions, err.Error(), bd.Generation)
return ctrl.Result{}, err
case v1alpha2source.StateUnpacked:
setUnpackStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("unpacked %s", bd.GetName()), bd.Generation)
default:
return ctrl.Result{}, fmt.Errorf("unkown unpack state %q for bundle deployment %s: %v", state, bd.GetName(), bd.Generation)
return ctrl.Result{}, fmt.Errorf("unkown unpack state %q for bundle deployment %s: %v", res.State, bd.GetName(), bd.Generation)
}

// Unpacked contents from each source would now be availabe in the fs. Validate
Expand Down Expand Up @@ -180,7 +182,7 @@ func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2
case v1alpha2deployer.StateDeploySuccessful:
setInstallStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("installed %s", bd.GetName()), bd.Generation)
default:
return ctrl.Result{}, fmt.Errorf("unkown deploy state %q for bundle deployment %s: %v", state, bd.GetName(), bd.Generation)
return ctrl.Result{}, fmt.Errorf("unkown deploy state %q for bundle deployment %s: %v", deployRes.State, bd.GetName(), bd.Generation)
}

fmt.Println("deplpoy done")
Expand Down Expand Up @@ -217,36 +219,35 @@ func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2

// unpackContents unpacks contents from all the sources, and stores under a directory referenced by the bundle deployment name.
// It returns the consolidated state on whether contents from all the sources have been unpacked.
func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bd *v1alpha2.BundleDeployment) (*afero.Fs, v1alpha2source.State, error) {
func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bd *v1alpha2.BundleDeployment) (*afero.Fs, v1alpha2source.Result, error) {
// set a base filesystem path and unpack contents under the root filepath defined by
// bundledeployment name.
bundleDepFs := afero.NewBasePathFs(afero.NewOsFs(), bd.GetName())

errs := make([]error, 0)
unpackResult := make([]v1alpha2source.Result, len(bd.Spec.Sources))
unpackResult := make([]v1alpha2source.Result, 0)

// Unpack each of the sources individually, and consolidate all their results into one.
for _, source := range bd.Spec.Sources {
res, err := b.unpacker.Unpack(ctx, bd.Name, source, bundleDepFs)
res, err := b.unpacker.Unpack(ctx, bd.Name, source, bundleDepFs, v1alpha2source.UnpackOption{
BundleDeploymentUID: bd.GetUID(),
})
if err != nil {
errs = append(errs, fmt.Errorf("error unpacking from %s source: %q: %v", source.Kind, res.Message, err))
}
unpackResult = append(unpackResult, *res)
}

// Even if one source has not unpacked, update Bundle Deployment status accordingly.
for _, res := range unpackResult {
if res.State == v1alpha2source.StateUnpackPending {
return &bundleDepFs, v1alpha2source.StateUnpackPending, nil
} else if res.State == v1alpha2source.StateUnpacking {
return &bundleDepFs, v1alpha2source.StateUnpacking, nil
if res.State == v1alpha2source.StateUnpackFailed {
return &bundleDepFs, res, apimacherrors.NewAggregate(errs)
}
if res.State != v1alpha2source.StateUnpacked {
return &bundleDepFs, res, nil
}
}

if len(errs) != 0 {
return &bundleDepFs, v1alpha2source.StateUnpackFailed, apimacherrors.NewAggregate(errs)
}

return &bundleDepFs, v1alpha2source.StateUnpacked, nil
return &bundleDepFs, v1alpha2source.Result{State: v1alpha2source.StateUnpacked, Message: "Successfully unpacked"}, nil
}

// validateContents validates if the unpacked bundle contents are of the right format.
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/v1alpha2/source/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Git struct {
Log logr.Logger
}

func (r *Git) Unpack(ctx context.Context, bundeDepName string, bundleSrc v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) {
func (r *Git) Unpack(ctx context.Context, bundeDepName string, bundleSrc v1alpha2.BundleDeplopymentSource, base afero.Fs, opts UnpackOption) (*Result, error) {
// Validate inputs
if err := r.validate(bundleSrc); err != nil {
return nil, fmt.Errorf("unpacking unsuccessful %v", err)
Expand Down
147 changes: 85 additions & 62 deletions internal/controllers/v1alpha2/source/image.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
package source

import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"

"github.com/nlepage/go-tarfs"
"github.com/operator-framework/rukpak/api/v1alpha2"
"github.com/operator-framework/rukpak/internal/util"
"github.com/otiai10/copy"
"github.com/spf13/afero"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,44 +37,25 @@ type Image struct {

const imageBundleUnpackContainerName = "bundle"

func (i *Image) Unpack(ctx context.Context, bdName string, bdSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) {
func (i *Image) Unpack(ctx context.Context, bdName string, bdSrc v1alpha2.BundleDeplopymentSource, base afero.Fs, opts UnpackOption) (*Result, error) {
// Validate inputs
if err := i.validate(bdSrc); err != nil {
if err := i.validate(&bdSrc, opts); err != nil {
return nil, fmt.Errorf("validation unsuccessful during unpacking %v", err)
}

// storage path to store contents in local directory.
storagePath := filepath.Join(bdName, filepath.Clean(bdSrc.Destination))

// If the bdSrc contents are cached, copy those in the specified dir
// and return.
cacheDir, err := getCachedContentPath(bdName, bdSrc, base)
if err != nil {
return nil, fmt.Errorf("error verifying cache %v", err)
}

// if cache exists, copy the contents and return result
if cacheDir != "" {
// copy the contents into the destination speified in the source.
if err := base.RemoveAll(filepath.Clean(bdSrc.Destination)); err != nil {
return nil, fmt.Errorf("error removing dir %v", err)
}
if err := copy.Copy(filepath.Join("bd-v1test", "cache", cacheDir), storagePath); err != nil {
return nil, fmt.Errorf("error fetching cached content %v", err)
}
}
return i.unpack(ctx, bdName, bdSrc, base)
return i.unpack(ctx, bdName, storagePath, bdSrc, base, opts)
}

func (i *Image) unpack(ctx context.Context, bdName string, bdSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) {
func (i *Image) unpack(ctx context.Context, bdName, storagePath string, bdSrc v1alpha2.BundleDeplopymentSource, base afero.Fs, opts UnpackOption) (*Result, error) {
pod := &corev1.Pod{}
op, err := i.ensureUnpackPod(ctx, bdName, bdSrc, pod)
op, err := i.ensureUnpackPod(ctx, bdName, bdSrc, pod, opts)
if err != nil {
return nil, err
} else if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated || pod.DeletionTimestamp != nil {
return &Result{State: StateUnpackPending}, nil
}

switch phase := pod.Status.Phase; phase {
case corev1.PodPending:
return pendingImagePodResult(pod), nil
Expand All @@ -85,29 +64,32 @@ func (i *Image) unpack(ctx context.Context, bdName string, bdSrc *v1alpha2.Bundl
case corev1.PodFailed:
return nil, i.failedPodResult(ctx, pod)
case corev1.PodSucceeded:
return i.succeededPodResult(ctx, pod)
return i.succeededPodResult(ctx, pod, storagePath, bdSrc, base)
default:
return nil, i.handleUnexpectedPod(ctx, pod)
}
}

func (i *Image) validate(bdSrc *v1alpha2.BundleDeplopymentSource) error {
func (i *Image) validate(bdSrc *v1alpha2.BundleDeplopymentSource, opts UnpackOption) error {
if bdSrc.Kind != v1alpha2.SourceTypeImage {
return fmt.Errorf("bundle source type %q not supported", bdSrc.Kind)
}
if bdSrc.Image == nil {
return fmt.Errorf("bundle source image configuration is unset")
return errors.New("bundle source image configuration is unset")
}
if opts.BundleDeploymentUID == "" {
return errors.New("bundle deployment UID required")
}
return nil
}

func (i *Image) ensureUnpackPod(ctx context.Context, bdName string, bundleSrc *v1alpha2.BundleDeplopymentSource, pod *corev1.Pod) (controllerutil.OperationResult, error) {
func (i *Image) ensureUnpackPod(ctx context.Context, bdName string, bundleSrc v1alpha2.BundleDeplopymentSource, pod *corev1.Pod, opts UnpackOption) (controllerutil.OperationResult, error) {
existingPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: i.PodNamespace, Name: bdName}}
if err := i.Client.Get(ctx, client.ObjectKeyFromObject(existingPod), existingPod); client.IgnoreNotFound(err) != nil {
return controllerutil.OperationResultNone, err
}

podApplyConfig := i.getDesiredPodApplyConfig(bdName, bundleSrc)
podApplyConfig := i.getDesiredPodApplyConfig(bdName, &bundleSrc, opts)
updatedPod, err := i.KubeClient.CoreV1().Pods(i.PodNamespace).Apply(ctx, podApplyConfig, metav1.ApplyOptions{Force: true, FieldManager: "rukpak-core"})
if err != nil {
if !apierrors.IsInvalid(err) {
Expand Down Expand Up @@ -153,17 +135,35 @@ func (i *Image) failedPodResult(ctx context.Context, pod *corev1.Pod) error {
return fmt.Errorf("unpack failed: %v", string(logs))
}

func (i *Image) succeededPodResult(ctx context.Context, pod *corev1.Pod) (*Result, error) {
// bundleFS, err := i.getBundleContents(ctx, pod)
// if err != nil {
// return nil, fmt.Errorf("get bundle contents: %v", err)
// }
func (i *Image) succeededPodResult(ctx context.Context, pod *corev1.Pod, storagePath string, bdSrc v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) {
err := i.getBundleContents(ctx, pod, storagePath, &bdSrc, base)
if err != nil {
return nil, fmt.Errorf("get bundle contents: %v", err)
}

digest, err := i.getBundleImageDigest(pod)
if err != nil {
return nil, fmt.Errorf("get bundle image digest: %v", err)
}

resolvedSource := &v1alpha2.BundleDeplopymentSource{
Kind: v1alpha2.SourceTypeImage,
Image: &v1alpha2.ImageSource{ImageRef: digest},
}

// figure out how to store content in the same format instead of bytes.
return nil, nil
return &Result{ResolvedSource: resolvedSource, State: StateUnpacked}, nil
}

func (i *Image) getBundleImageDigest(pod *corev1.Pod) (string, error) {
for _, ps := range pod.Status.ContainerStatuses {
if ps.Name == imageBundleUnpackContainerName && ps.ImageID != "" {
return ps.ImageID, nil
}
}
return "", fmt.Errorf("bundle image digest not found")
}

func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.BundleDeplopymentSource) *applyconfigurationcorev1.PodApplyConfiguration {
func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.BundleDeplopymentSource, opts UnpackOption) *applyconfigurationcorev1.PodApplyConfiguration {
// TODO (tyslaton): Address unpacker pod allowing root users for image sources
//
// In our current implementation, we are creating a pod that uses the image
Expand Down Expand Up @@ -191,6 +191,7 @@ func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.Bund
WithName(bdName).
WithKind(v1alpha2.BundleDeploymentKind).
WithAPIVersion(v1alpha2.BundleDeploymentGVK.Version).
WithUID(opts.BundleDeploymentUID).
WithController(true).
WithBlockOwnerDeletion(true),
).
Expand Down Expand Up @@ -238,32 +239,66 @@ func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.Bund
return podApply
}

func (i *Image) getBundleContents(ctx context.Context, pod *corev1.Pod, storagePath string, bundleSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (fs.FS, error) {
func (i *Image) getBundleContents(ctx context.Context, pod *corev1.Pod, storagePath string, bundleSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) error {
bundleData, err := i.getPodLogs(ctx, pod)
if err != nil {
return nil, fmt.Errorf("get bundle contents: %v", err)
return fmt.Errorf("error getting bundle contents: %v", err)
}
bd := struct {
Content []byte `json:"content"`
}{}

if err := json.Unmarshal(bundleData, &bd); err != nil {
return nil, fmt.Errorf("parse bundle data: %v", err)
return fmt.Errorf("error parsing bundle data: %v", err)
}

if err := base.RemoveAll(filepath.Clean(bundleSrc.Destination)); err != nil {
return nil, fmt.Errorf("error removing dir %v", err)
if err := (base).RemoveAll(filepath.Clean(bundleSrc.Destination)); err != nil {
return fmt.Errorf("error removing dir %v", err)
}

if err := os.MkdirAll(storagePath, 0755); err != nil {
return nil, fmt.Errorf("error creating storagepath %q", err)
if err := base.MkdirAll(bundleSrc.Destination, 0755); err != nil {
return fmt.Errorf("error creating storagepath %q", err)
}

gzr, err := gzip.NewReader(bytes.NewReader(bd.Content))
if err != nil {
return nil, fmt.Errorf("read bundle content gzip: %v", err)
return fmt.Errorf("error reading bundle content gzip: %v", err)
}

// create a tar reader to parse the decompressed data.
tr := tar.NewReader(gzr)

for {
header, err := tr.Next()
if err == io.EOF {
break
}

if err != nil {
return fmt.Errorf("error storing content locally: %v", err)
}

// create file or directory in the file system.
if header.Typeflag == tar.TypeDir {
if err := base.MkdirAll(filepath.Join(header.Name), 0755); err != nil {
return fmt.Errorf("error creating directory for storing bundle contents: %v", err)
}
} else if header.Typeflag == tar.TypeReg {
// If its a regular file, create one and copy data.
file, err := base.Create(filepath.Join(header.Name))
if err != nil {
return fmt.Errorf("error creating file for storing bundle contents: %v", err)
}

if _, err := io.Copy(file, tr); err != nil {
return fmt.Errorf("error copying contents: %v", err)
}
file.Close()
} else {
return fmt.Errorf("unsupported tar entry type for %s: %v while unpacking", header.Name, header.Typeflag)
}
}
return tarfs.New(gzr)
return nil
}

func (i *Image) getPodLogs(ctx context.Context, pod *corev1.Pod) ([]byte, error) {
Expand Down Expand Up @@ -296,18 +331,6 @@ func pendingImagePodResult(pod *corev1.Pod) *Result {
return &Result{State: StateUnpackPending, Message: strings.Join(messages, "; ")}
}

// getCachedContentPath returns the name of the cached directory if exists.
func getCachedContentPath(bdaName string, bundleSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (string, error) {
cachedDirName := getCacheDirName(bdaName, *bundleSrc)

if ok, err := afero.DirExists(base, filepath.Join(CacheDir, cachedDirName)); err != nil {
return "", fmt.Errorf("error finding cache dir %v", err)
} else if !ok {
return "", nil
}
return cachedDirName, nil
}

// Perform a base64 encoding to get the directoryName to store caches
func getCacheDirName(bdName string, bd v1alpha2.BundleDeplopymentSource) string {
switch bd.Kind {
Expand Down
Loading

0 comments on commit 0256166

Please sign in to comment.