diff --git a/cmd/core/main.go b/cmd/core/main.go index a06b4a7c..19a97a03 100644 --- a/cmd/core/main.go +++ b/cmd/core/main.go @@ -218,7 +218,8 @@ func main() { os.Exit(1) } - defaultUnpacker, err := v1alpha2source.NewDefaultUnpackerWithOpts(systemNsCluster, systemNamespace) + // TODO(Varsha): Revert unpack image to default val. This was just for checking + defaultUnpacker, err := v1alpha2source.NewDefaultUnpackerWithOpts(systemNsCluster, systemNamespace, v1alpha2source.WithUnpackImage("quay.io/operator-framework/rukpak:main")) // unpacker, err := source.NewDefaultUnpacker(systemNsCluster, systemNamespace, unpackImage, baseUploadManagerURL, rootCAs) if err != nil { setupLog.Error(err, "unable to setup bundle unpacker") diff --git a/internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go b/internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go index ee7aa199..18f8d421 100644 --- a/internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go +++ b/internal/controllers/v1alpha2/controllers/bundledeployment/bundledeployment.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "sync" + "time" - "github.com/google/go-cmp/cmp" "github.com/operator-framework/rukpak/api/v1alpha2" v1alpha2deployer "github.com/operator-framework/rukpak/internal/controllers/v1alpha2/deployer" @@ -123,7 +123,6 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req // The controller is not updating spec, we only update the status. Hence sending // a status update should be enough. if !equality.Semantic.DeepEqual(existingBD.Status, reconciledBD.Status) { - fmt.Println("before update", cmp.Diff(existingBD, reconciledBD)) if updateErr := b.Status().Update(ctx, reconciledBD); updateErr != nil { return res, apimacherrors.NewAggregate([]error{reconcileErr, updateErr}) } @@ -134,21 +133,24 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2.BundleDeployment) (ctrl.Result, error) { // Unpack contents from the bundle deployment for each of the specified source and update // the status of the object. - bundleDepFS, state, err := b.unpackContents(ctx, bd) - switch state { + // TODO: In case of unpack pending and request is being requeued again indefinitely. + bundleDepFS, res, err := b.unpackContents(ctx, bd) + switch res.State { case v1alpha2source.StateUnpackPending: - setUnpackStatusPending(&bd.Status.Conditions, err.Error(), bd.Generation) - return ctrl.Result{}, nil + setUnpackStatusPending(&bd.Status.Conditions, fmt.Sprintf("pending unpack pod"), bd.Generation) + // Requeing after 5 sec for now since the average time to unpack an registry bundle locally + // was around ~4sec. + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil case v1alpha2source.StateUnpacking: - setUnpackStatusPending(&bd.Status.Conditions, err.Error(), bd.Generation) - return ctrl.Result{}, nil + setUnpackStatusPending(&bd.Status.Conditions, fmt.Sprintf("unpacking pod"), bd.Generation) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil case v1alpha2source.StateUnpackFailed: setUnpackStatusFailing(&bd.Status.Conditions, err.Error(), bd.Generation) return ctrl.Result{}, err case v1alpha2source.StateUnpacked: setUnpackStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("unpacked %s", bd.GetName()), bd.Generation) default: - return ctrl.Result{}, fmt.Errorf("unkown unpack state %q for bundle deployment %s: %v", state, bd.GetName(), bd.Generation) + return ctrl.Result{}, fmt.Errorf("unkown unpack state %q for bundle deployment %s: %v", res.State, bd.GetName(), bd.Generation) } // Unpacked contents from each source would now be availabe in the fs. Validate @@ -180,7 +182,7 @@ func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2 case v1alpha2deployer.StateDeploySuccessful: setInstallStatusSuccess(&bd.Status.Conditions, fmt.Sprintf("installed %s", bd.GetName()), bd.Generation) default: - return ctrl.Result{}, fmt.Errorf("unkown deploy state %q for bundle deployment %s: %v", state, bd.GetName(), bd.Generation) + return ctrl.Result{}, fmt.Errorf("unkown deploy state %q for bundle deployment %s: %v", deployRes.State, bd.GetName(), bd.Generation) } fmt.Println("deplpoy done") @@ -217,36 +219,35 @@ func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bd *v1alpha2 // unpackContents unpacks contents from all the sources, and stores under a directory referenced by the bundle deployment name. // It returns the consolidated state on whether contents from all the sources have been unpacked. -func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bd *v1alpha2.BundleDeployment) (*afero.Fs, v1alpha2source.State, error) { +func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bd *v1alpha2.BundleDeployment) (*afero.Fs, v1alpha2source.Result, error) { // set a base filesystem path and unpack contents under the root filepath defined by // bundledeployment name. bundleDepFs := afero.NewBasePathFs(afero.NewOsFs(), bd.GetName()) errs := make([]error, 0) - unpackResult := make([]v1alpha2source.Result, len(bd.Spec.Sources)) + unpackResult := make([]v1alpha2source.Result, 0) // Unpack each of the sources individually, and consolidate all their results into one. for _, source := range bd.Spec.Sources { - res, err := b.unpacker.Unpack(ctx, bd.Name, source, bundleDepFs) + res, err := b.unpacker.Unpack(ctx, bd.Name, source, bundleDepFs, v1alpha2source.UnpackOption{ + BundleDeploymentUID: bd.GetUID(), + }) if err != nil { errs = append(errs, fmt.Errorf("error unpacking from %s source: %q: %v", source.Kind, res.Message, err)) } + unpackResult = append(unpackResult, *res) } // Even if one source has not unpacked, update Bundle Deployment status accordingly. for _, res := range unpackResult { - if res.State == v1alpha2source.StateUnpackPending { - return &bundleDepFs, v1alpha2source.StateUnpackPending, nil - } else if res.State == v1alpha2source.StateUnpacking { - return &bundleDepFs, v1alpha2source.StateUnpacking, nil + if res.State == v1alpha2source.StateUnpackFailed { + return &bundleDepFs, res, apimacherrors.NewAggregate(errs) + } + if res.State != v1alpha2source.StateUnpacked { + return &bundleDepFs, res, nil } } - - if len(errs) != 0 { - return &bundleDepFs, v1alpha2source.StateUnpackFailed, apimacherrors.NewAggregate(errs) - } - - return &bundleDepFs, v1alpha2source.StateUnpacked, nil + return &bundleDepFs, v1alpha2source.Result{State: v1alpha2source.StateUnpacked, Message: "Successfully unpacked"}, nil } // validateContents validates if the unpacked bundle contents are of the right format. diff --git a/internal/controllers/v1alpha2/source/git.go b/internal/controllers/v1alpha2/source/git.go index 809759e6..e1ad5a49 100644 --- a/internal/controllers/v1alpha2/source/git.go +++ b/internal/controllers/v1alpha2/source/git.go @@ -33,7 +33,7 @@ type Git struct { Log logr.Logger } -func (r *Git) Unpack(ctx context.Context, bundeDepName string, bundleSrc v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) { +func (r *Git) Unpack(ctx context.Context, bundeDepName string, bundleSrc v1alpha2.BundleDeplopymentSource, base afero.Fs, opts UnpackOption) (*Result, error) { // Validate inputs if err := r.validate(bundleSrc); err != nil { return nil, fmt.Errorf("unpacking unsuccessful %v", err) diff --git a/internal/controllers/v1alpha2/source/image.go b/internal/controllers/v1alpha2/source/image.go index efd1bcc0..a38a12f8 100644 --- a/internal/controllers/v1alpha2/source/image.go +++ b/internal/controllers/v1alpha2/source/image.go @@ -1,25 +1,23 @@ package source import ( + "archive/tar" "bytes" "compress/gzip" "context" "encoding/base64" "encoding/json" + "errors" "fmt" "io" - "io/fs" - "os" "path/filepath" "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - "github.com/nlepage/go-tarfs" "github.com/operator-framework/rukpak/api/v1alpha2" "github.com/operator-framework/rukpak/internal/util" - "github.com/otiai10/copy" "github.com/spf13/afero" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,44 +37,25 @@ type Image struct { const imageBundleUnpackContainerName = "bundle" -func (i *Image) Unpack(ctx context.Context, bdName string, bdSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) { +func (i *Image) Unpack(ctx context.Context, bdName string, bdSrc v1alpha2.BundleDeplopymentSource, base afero.Fs, opts UnpackOption) (*Result, error) { // Validate inputs - if err := i.validate(bdSrc); err != nil { + if err := i.validate(&bdSrc, opts); err != nil { return nil, fmt.Errorf("validation unsuccessful during unpacking %v", err) } // storage path to store contents in local directory. storagePath := filepath.Join(bdName, filepath.Clean(bdSrc.Destination)) - - // If the bdSrc contents are cached, copy those in the specified dir - // and return. - cacheDir, err := getCachedContentPath(bdName, bdSrc, base) - if err != nil { - return nil, fmt.Errorf("error verifying cache %v", err) - } - - // if cache exists, copy the contents and return result - if cacheDir != "" { - // copy the contents into the destination speified in the source. - if err := base.RemoveAll(filepath.Clean(bdSrc.Destination)); err != nil { - return nil, fmt.Errorf("error removing dir %v", err) - } - if err := copy.Copy(filepath.Join("bd-v1test", "cache", cacheDir), storagePath); err != nil { - return nil, fmt.Errorf("error fetching cached content %v", err) - } - } - return i.unpack(ctx, bdName, bdSrc, base) + return i.unpack(ctx, bdName, storagePath, bdSrc, base, opts) } -func (i *Image) unpack(ctx context.Context, bdName string, bdSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) { +func (i *Image) unpack(ctx context.Context, bdName, storagePath string, bdSrc v1alpha2.BundleDeplopymentSource, base afero.Fs, opts UnpackOption) (*Result, error) { pod := &corev1.Pod{} - op, err := i.ensureUnpackPod(ctx, bdName, bdSrc, pod) + op, err := i.ensureUnpackPod(ctx, bdName, bdSrc, pod, opts) if err != nil { return nil, err } else if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated || pod.DeletionTimestamp != nil { return &Result{State: StateUnpackPending}, nil } - switch phase := pod.Status.Phase; phase { case corev1.PodPending: return pendingImagePodResult(pod), nil @@ -85,29 +64,32 @@ func (i *Image) unpack(ctx context.Context, bdName string, bdSrc *v1alpha2.Bundl case corev1.PodFailed: return nil, i.failedPodResult(ctx, pod) case corev1.PodSucceeded: - return i.succeededPodResult(ctx, pod) + return i.succeededPodResult(ctx, pod, storagePath, bdSrc, base) default: return nil, i.handleUnexpectedPod(ctx, pod) } } -func (i *Image) validate(bdSrc *v1alpha2.BundleDeplopymentSource) error { +func (i *Image) validate(bdSrc *v1alpha2.BundleDeplopymentSource, opts UnpackOption) error { if bdSrc.Kind != v1alpha2.SourceTypeImage { return fmt.Errorf("bundle source type %q not supported", bdSrc.Kind) } if bdSrc.Image == nil { - return fmt.Errorf("bundle source image configuration is unset") + return errors.New("bundle source image configuration is unset") + } + if opts.BundleDeploymentUID == "" { + return errors.New("bundle deployment UID required") } return nil } -func (i *Image) ensureUnpackPod(ctx context.Context, bdName string, bundleSrc *v1alpha2.BundleDeplopymentSource, pod *corev1.Pod) (controllerutil.OperationResult, error) { +func (i *Image) ensureUnpackPod(ctx context.Context, bdName string, bundleSrc v1alpha2.BundleDeplopymentSource, pod *corev1.Pod, opts UnpackOption) (controllerutil.OperationResult, error) { existingPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: i.PodNamespace, Name: bdName}} if err := i.Client.Get(ctx, client.ObjectKeyFromObject(existingPod), existingPod); client.IgnoreNotFound(err) != nil { return controllerutil.OperationResultNone, err } - podApplyConfig := i.getDesiredPodApplyConfig(bdName, bundleSrc) + podApplyConfig := i.getDesiredPodApplyConfig(bdName, &bundleSrc, opts) updatedPod, err := i.KubeClient.CoreV1().Pods(i.PodNamespace).Apply(ctx, podApplyConfig, metav1.ApplyOptions{Force: true, FieldManager: "rukpak-core"}) if err != nil { if !apierrors.IsInvalid(err) { @@ -153,17 +135,35 @@ func (i *Image) failedPodResult(ctx context.Context, pod *corev1.Pod) error { return fmt.Errorf("unpack failed: %v", string(logs)) } -func (i *Image) succeededPodResult(ctx context.Context, pod *corev1.Pod) (*Result, error) { - // bundleFS, err := i.getBundleContents(ctx, pod) - // if err != nil { - // return nil, fmt.Errorf("get bundle contents: %v", err) - // } +func (i *Image) succeededPodResult(ctx context.Context, pod *corev1.Pod, storagePath string, bdSrc v1alpha2.BundleDeplopymentSource, base afero.Fs) (*Result, error) { + err := i.getBundleContents(ctx, pod, storagePath, &bdSrc, base) + if err != nil { + return nil, fmt.Errorf("get bundle contents: %v", err) + } + + digest, err := i.getBundleImageDigest(pod) + if err != nil { + return nil, fmt.Errorf("get bundle image digest: %v", err) + } + + resolvedSource := &v1alpha2.BundleDeplopymentSource{ + Kind: v1alpha2.SourceTypeImage, + Image: &v1alpha2.ImageSource{ImageRef: digest}, + } - // figure out how to store content in the same format instead of bytes. - return nil, nil + return &Result{ResolvedSource: resolvedSource, State: StateUnpacked}, nil +} + +func (i *Image) getBundleImageDigest(pod *corev1.Pod) (string, error) { + for _, ps := range pod.Status.ContainerStatuses { + if ps.Name == imageBundleUnpackContainerName && ps.ImageID != "" { + return ps.ImageID, nil + } + } + return "", fmt.Errorf("bundle image digest not found") } -func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.BundleDeplopymentSource) *applyconfigurationcorev1.PodApplyConfiguration { +func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.BundleDeplopymentSource, opts UnpackOption) *applyconfigurationcorev1.PodApplyConfiguration { // TODO (tyslaton): Address unpacker pod allowing root users for image sources // // In our current implementation, we are creating a pod that uses the image @@ -191,6 +191,7 @@ func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.Bund WithName(bdName). WithKind(v1alpha2.BundleDeploymentKind). WithAPIVersion(v1alpha2.BundleDeploymentGVK.Version). + WithUID(opts.BundleDeploymentUID). WithController(true). WithBlockOwnerDeletion(true), ). @@ -238,32 +239,66 @@ func (i *Image) getDesiredPodApplyConfig(bdName string, bundleSrc *v1alpha2.Bund return podApply } -func (i *Image) getBundleContents(ctx context.Context, pod *corev1.Pod, storagePath string, bundleSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (fs.FS, error) { +func (i *Image) getBundleContents(ctx context.Context, pod *corev1.Pod, storagePath string, bundleSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) error { bundleData, err := i.getPodLogs(ctx, pod) if err != nil { - return nil, fmt.Errorf("get bundle contents: %v", err) + return fmt.Errorf("error getting bundle contents: %v", err) } bd := struct { Content []byte `json:"content"` }{} if err := json.Unmarshal(bundleData, &bd); err != nil { - return nil, fmt.Errorf("parse bundle data: %v", err) + return fmt.Errorf("error parsing bundle data: %v", err) } - if err := base.RemoveAll(filepath.Clean(bundleSrc.Destination)); err != nil { - return nil, fmt.Errorf("error removing dir %v", err) + if err := (base).RemoveAll(filepath.Clean(bundleSrc.Destination)); err != nil { + return fmt.Errorf("error removing dir %v", err) } - if err := os.MkdirAll(storagePath, 0755); err != nil { - return nil, fmt.Errorf("error creating storagepath %q", err) + if err := base.MkdirAll(bundleSrc.Destination, 0755); err != nil { + return fmt.Errorf("error creating storagepath %q", err) } gzr, err := gzip.NewReader(bytes.NewReader(bd.Content)) if err != nil { - return nil, fmt.Errorf("read bundle content gzip: %v", err) + return fmt.Errorf("error reading bundle content gzip: %v", err) + } + + // create a tar reader to parse the decompressed data. + tr := tar.NewReader(gzr) + + for { + header, err := tr.Next() + if err == io.EOF { + break + } + + if err != nil { + return fmt.Errorf("error storing content locally: %v", err) + } + + // create file or directory in the file system. + if header.Typeflag == tar.TypeDir { + if err := base.MkdirAll(filepath.Join(header.Name), 0755); err != nil { + return fmt.Errorf("error creating directory for storing bundle contents: %v", err) + } + } else if header.Typeflag == tar.TypeReg { + // If its a regular file, create one and copy data. + file, err := base.Create(filepath.Join(header.Name)) + if err != nil { + return fmt.Errorf("error creating file for storing bundle contents: %v", err) + } + + if _, err := io.Copy(file, tr); err != nil { + return fmt.Errorf("error copying contents: %v", err) + } + file.Close() + } else { + return fmt.Errorf("unsupported tar entry type for %s: %v while unpacking", header.Name, header.Typeflag) + } } - return tarfs.New(gzr) + return nil } func (i *Image) getPodLogs(ctx context.Context, pod *corev1.Pod) ([]byte, error) { @@ -296,18 +331,6 @@ func pendingImagePodResult(pod *corev1.Pod) *Result { return &Result{State: StateUnpackPending, Message: strings.Join(messages, "; ")} } -// getCachedContentPath returns the name of the cached directory if exists. -func getCachedContentPath(bdaName string, bundleSrc *v1alpha2.BundleDeplopymentSource, base afero.Fs) (string, error) { - cachedDirName := getCacheDirName(bdaName, *bundleSrc) - - if ok, err := afero.DirExists(base, filepath.Join(CacheDir, cachedDirName)); err != nil { - return "", fmt.Errorf("error finding cache dir %v", err) - } else if !ok { - return "", nil - } - return cachedDirName, nil -} - // Perform a base64 encoding to get the directoryName to store caches func getCacheDirName(bdName string, bd v1alpha2.BundleDeplopymentSource) string { switch bd.Kind { diff --git a/internal/controllers/v1alpha2/source/unpacker.go b/internal/controllers/v1alpha2/source/unpacker.go index 23756afe..2f9ff28f 100644 --- a/internal/controllers/v1alpha2/source/unpacker.go +++ b/internal/controllers/v1alpha2/source/unpacker.go @@ -9,11 +9,22 @@ import ( "github.com/operator-framework/rukpak/api/v1alpha2" "github.com/spf13/afero" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/cluster" ) +// UnpackOptions stores bundle deployment specific options +// that are passed to the unpacker. +// This is currently used to pass the bundle deployment UID +// for the use in image unpacker. But can further be expanded +// to pass other bundle specific options. +type UnpackOption struct { + BundleDeploymentUID types.UID +} + type Unpacker interface { - Unpack(ctx context.Context, bundleDeploymentName string, bundleDeploymentSource v1alpha2.BundleDeplopymentSource, fs afero.Fs) (*Result, error) + Unpack(ctx context.Context, bundleDeploymentName string, bundleDeploymentSource v1alpha2.BundleDeplopymentSource, fs afero.Fs, opts UnpackOption) (*Result, error) } // Result conveys progress information about unpacking bundle content. @@ -54,9 +65,6 @@ const ( // StateUnpackFailed conveys that the unpacking of the bundle has failed. StateUnpackFailed State = "Unpack failed" - - // CacheDir is where the unpacked bundle contents are cached locally. - CacheDir string = "cache" ) type defaultUnpacker struct { @@ -101,12 +109,12 @@ func NewUnpacker(sources map[v1alpha2.SourceType]Unpacker) Unpacker { // Unpack itrates over the sources specified in bundleDeployment object. Unpacking is done // for each specified source, the bundle contents are stored in the specified destination. -func (s *unpacker) Unpack(ctx context.Context, bdDepName string, bd v1alpha2.BundleDeplopymentSource, fs afero.Fs) (*Result, error) { +func (s *unpacker) Unpack(ctx context.Context, bdDepName string, bd v1alpha2.BundleDeplopymentSource, fs afero.Fs, opts UnpackOption) (*Result, error) { source, ok := s.sources[bd.Kind] if !ok { return nil, fmt.Errorf("source type %q not supported", bd.Kind) } - return source.Unpack(ctx, bdDepName, bd, fs) + return source.Unpack(ctx, bdDepName, bd, fs, opts) } func NewDefaultUnpackerWithOpts(systemNsCluster cluster.Cluster, namespace string, opts ...UnpackerOption) (Unpacker, error) { @@ -126,6 +134,12 @@ func (u *defaultUnpacker) initialize() (Unpacker, error) { return nil, fmt.Errorf("systemNsCluster cannot be empty, cannot initialize") } + cfg := u.systemNsCluster.GetConfig() + kubeclient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + httpTransport := http.DefaultTransport.(*http.Transport).Clone() if httpTransport.TLSClientConfig == nil { httpTransport.TLSClientConfig = &tls.Config{ @@ -139,5 +153,11 @@ func (u *defaultUnpacker) initialize() (Unpacker, error) { Reader: u.systemNsCluster.GetClient(), SecretNamespace: u.namespace, }, + v1alpha2.SourceTypeImage: &Image{ + Client: u.systemNsCluster.GetClient(), + KubeClient: kubeclient, + PodNamespace: u.namespace, + UnpackImage: u.unpackImage, + }, }), nil } diff --git a/internal/controllers/v1alpha2/validator/validator.go b/internal/controllers/v1alpha2/validator/validator.go index fadd36c2..c50a49e3 100644 --- a/internal/controllers/v1alpha2/validator/validator.go +++ b/internal/controllers/v1alpha2/validator/validator.go @@ -40,7 +40,6 @@ func NewDefaultValidator() Validator { type registryV1Validator struct{} func (r *registryV1Validator) Validate(ctx context.Context, fs afero.Fs, bundleDeployment v1alpha2.BundleDeployment) error { - fmt.Println("converting registry V1 to plain") plainFS, err := convert.RegistryV1ToPlain(fs) if err != nil { return fmt.Errorf("error converting registry+v1 bundle to plain+v0 bundle: %v", err)