Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
[deploy] Add logic for helm deployer to bundledeployment reconciler
Browse files Browse the repository at this point in the history
This commit introduces:
1. Deployer interface and Helm deployer - to deploy contents using helm
2. Wire deployer logic into reconciler.
3. Add SetupWithManager, so that this can be called from core/main.go in a
  follow up
.
Signed-off-by: Varsha Prasad Narsing <[email protected]>
  • Loading branch information
varshaprasad96 committed Oct 16, 2023
1 parent 9c1a6fd commit fb0fa79
Show file tree
Hide file tree
Showing 9 changed files with 791 additions and 61 deletions.
29 changes: 23 additions & 6 deletions api/v1alpha2/bundledeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha2

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

var (
Expand All @@ -26,12 +27,21 @@ var (
)

const (
TypeUnpacked = "Unpacked"

ReasonUnpackFailed = "UnpackFailed"
ReasonUnpackPending = "UnpackPending"
ReasonUnpacking = "Unpacking"
ReasonUnpackSuccessful = "UnpackSuccessful"
TypeUnpacked = "Unpacked"
TypeValidated = "Validated"
TypeInstalled = "Installed"

ReasonUnpackFailed = "UnpackFailed"
ReasonUnpackPending = "UnpackPending"
ReasonUnpacking = "Unpacking"
ReasonUnpackSuccessful = "UnpackSuccessful"
ReasonValidateSuccessful = "ValidateSuccessful"
ReasonValidateFailed = "ValidateFailed"
ReasonInstallFailed = "InstallFailed"
ReasonUpgradeFailed = "UpgradeFailed"
ReasonReconcileFailed = "ReconcileFailed"
ReasonCreateDynamicWatchFailed = "CreateDynamicWatchFailed"
ReasonInstallSucceeded = "InstallationSucceeded"
)

// +kubebuilder:object:root=true
Expand Down Expand Up @@ -81,6 +91,13 @@ type BundleDeploymentSpec struct {
// +optional
Paused bool `json:"paused,omitempty"`

// Config is helm spcific configuration to load
// helm values.
// TODO: This should be become deployer specific.
// Should move to helm deployer configuration (or probably templating?).
// +kubebuilder:pruning:PreserveUnknownFields
Config runtime.RawExtension `json:"config,omitempty"`

// DefaultNamespace refers to the namespace where
// namespace-scoped objects would be created if not
// explicitly set within the bundle.
Expand Down
183 changes: 182 additions & 1 deletion internal/v1alpha2/controllers/bundledeployment/bundledeployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,39 @@ package bundledeployment

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/operator-framework/rukpak/api/v1alpha2"
source "github.com/operator-framework/rukpak/internal/v1alpha2/source"
helmpredicate "github.com/operator-framework/rukpak/internal/helm-operator-plugins/predicate"
"github.com/operator-framework/rukpak/internal/v1alpha2/deployer"
"github.com/operator-framework/rukpak/internal/v1alpha2/source"
"github.com/operator-framework/rukpak/internal/v1alpha2/store"
"github.com/operator-framework/rukpak/internal/v1alpha2/validator"
"github.com/spf13/afero"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
apimacherrors "k8s.io/apimachinery/pkg/util/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
crcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
crsource "sigs.k8s.io/controller-runtime/pkg/source"
)

const (
Expand All @@ -55,6 +70,13 @@ type bundleDeploymentReconciler struct {
// on filesystem for the bundle types which would be handled
// by this reconciler.
unpacker source.Unpacker
// validators have specific rules defined, based on which
// they validate the unpacked content.
// Accepting validators as a list, in case custom validators
// are needed to be added in future.
validators []validator.Validator
// deployer knows how to apply bundle objects into cluster.
deployer deployer.Deployer

dynamicWatchMutex sync.RWMutex
dynamicWatchGVKs map[schema.GroupVersionKind]struct{}
Expand Down Expand Up @@ -85,6 +107,7 @@ func (b *bundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
err := b.Client.Get(ctx, req.NamespacedName, existingBD)
if err != nil {
if apierrors.IsNotFound(err) {
// TODO: if bundledeployment is deleted, remove the unpacked bundle present locally.
log.Info("bundledeployment resource not found. Ignoring since object must be deleted.")
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -149,6 +172,70 @@ func (b *bundleDeploymentReconciler) reconcile(ctx context.Context, bundleDeploy
return ctrl.Result{}, fmt.Errorf("unkown unpack state %q for bundle deployment %s: %v", res.State, bundleDeployment.GetName(), bundleDeployment.Generation)
}

// Unpacked contents from each source would now be availabe in the fs. Validate
// if the contents together conform to the specified format.
if err = b.validateContents(ctx, bundleDeployment.Spec.Format, bundledeploymentStore); err != nil {
validateErr := fmt.Errorf("validating contents for bundle %s with format %s: %v", bundleDeployment.Name, bundleDeployment.Spec.Format, err)
setValidateStatusFailing(&bundleDeployment.Status.Conditions, validateErr.Error(), bundleDeployment.Generation)
return ctrl.Result{}, validateErr
}
setValidateStatusSuccess(&bundleDeployment.Status.Conditions, fmt.Sprintf("unpacked successfully"), bundleDeployment.Generation)

// Deploy the validated contents onto the cluster.
// The deployer should return the list of objects which have been deployed, so that
// controller can be configured to set up watches for them.
deployResult, err := b.deployContents(ctx, bundledeploymentStore, bundleDeployment)
switch deployResult.State {
case deployer.StateIntallFailed:
setInstallStatusFailed(&bundleDeployment.Status.Conditions, err.Error(), bundleDeployment.Generation)
return ctrl.Result{}, err
case deployer.StateUnpgradeFailed:
setUnpgradeStatusFailed(&bundleDeployment.Status.Conditions, err.Error(), bundleDeployment.Generation)
return ctrl.Result{}, err
case deployer.StateReconcileFailed:
setReconcileStatusFailed(&bundleDeployment.Status.Conditions, err.Error(), bundleDeployment.Generation)
return ctrl.Result{}, err
case deployer.StateObjectFetchFailed:
setDynamicWatchFailed(&bundleDeployment.Status.Conditions, err.Error(), bundleDeployment.Generation)
return ctrl.Result{}, err
case deployer.StateDeploySuccessful:
setInstallStatusSuccess(&bundleDeployment.Status.Conditions, fmt.Sprintf("installed %s", bundleDeployment.GetName()), bundleDeployment.Generation)
default:
return ctrl.Result{}, fmt.Errorf("unkown deploy state %q for bundle deployment %s: %v", deployResult.State, bundleDeployment.GetName(), bundleDeployment.Generation)
}

// for the objects returned from the deployer, set watches on them.
// TODO(brainstorm): any event coming from the dependent object will trigger the entire reconcile,
// making it to unpack again. Introduce a caching mechanism to skip unpacking when the source has not
// changed.
for _, obj := range deployResult.AppliedObjects {
uMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
setDynamicWatchFailed(&bundleDeployment.Status.Conditions, err.Error(), bundleDeployment.Generation)
return ctrl.Result{}, err
}

unstructuredObj := &unstructured.Unstructured{Object: uMap}
if err := func() error {
b.dynamicWatchMutex.Lock()
defer b.dynamicWatchMutex.Unlock()

_, isWatched := b.dynamicWatchGVKs[unstructuredObj.GroupVersionKind()]
if !isWatched {
if err := b.controller.Watch(
&crsource.Kind{Type: unstructuredObj},
&handler.EnqueueRequestForOwner{OwnerType: bundleDeployment, IsController: true},
helmpredicate.DependentPredicateFuncs()); err != nil {
return err
}
b.dynamicWatchGVKs[unstructuredObj.GroupVersionKind()] = struct{}{}
}
return nil
}(); err != nil {
setDynamicWatchFailed(&bundleDeployment.Status.Conditions, err.Error(), bundleDeployment.Generation)
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -178,3 +265,97 @@ func (b *bundleDeploymentReconciler) unpackContents(ctx context.Context, bundled
// TODO: capture the list of resolved sources for all the successful entry points.
return &source.Result{State: source.StateUnpacked, Message: "Successfully unpacked"}, nil
}

// validateContents validates if the unpacked bundle contents are of the right format.
func (b *bundleDeploymentReconciler) validateContents(ctx context.Context, format v1alpha2.FormatType, store store.Store) error {
errs := make([]error, 0)
for _, validator := range b.validators {
if err := validator.Validate(ctx, format, store); err != nil {
errs = append(errs, err)
}
}
return apimacherrors.NewAggregate(errs)
}

// deployContents calls the registered deployer to apply the bundle contents onto the cluster.
func (b *bundleDeploymentReconciler) deployContents(ctx context.Context, store store.Store, bd *v1alpha2.BundleDeployment) (*deployer.Result, error) {
return b.deployer.Deploy(ctx, store, bd)
}

func (b *bundleDeploymentReconciler) validateConfig() error {
errs := []error{}
if b.unpacker == nil {
errs = append(errs, errors.New("unpacker is unset"))
}
if b.validators == nil || len(b.validators) == 0 {
errs = append(errs, errors.New("validators not provided"))
}
if b.deployer == nil {
errs = append(errs, errors.New("deployer is unset"))
}
return utilerrors.NewAggregate(errs)
}

func SetupWithManager(mgr manager.Manager, systemNsCache cache.Cache, opts ...Option) error {
bd := &bundleDeploymentReconciler{
Client: mgr.GetClient(),
dynamicWatchGVKs: map[schema.GroupVersionKind]struct{}{},
}
for _, o := range opts {
o(bd)
}

if err := bd.validateConfig(); err != nil {
return fmt.Errorf("invalid configuration: %v", err)
}

controllerName := fmt.Sprintf("controller-bundledeployment.%s", v1alpha2.BundleDeploymentGVK.Version)
l := mgr.GetLogger().WithName(controllerName)
controller, err := ctrl.NewControllerManagedBy(mgr).
Named(controllerName).
For(&v1alpha2.BundleDeployment{}).
Watches(crsource.NewKindWithCache(&corev1.Pod{}, systemNsCache), MapOwnerToBundleDeploymentHandler(context.Background(), mgr.GetClient(), l, &v1alpha2.BundleDeployment{})).
Build(bd)
if err != nil {
return err
}
bd.controller = controller
return nil
}

// MapOwnerToBundleDeploymentHandler is a handler implementation that finds an owner reference in the event object that
// references the provided owner. If a reference for the provided owner is found this handler enqueues a request for that owner to be reconciled.
func MapOwnerToBundleDeploymentHandler(ctx context.Context, cl client.Client, log logr.Logger, owner client.Object) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
ownerGVK, err := apiutil.GVKForObject(owner, cl.Scheme())
if err != nil {
log.Error(err, "map ownee to owner: lookup GVK for owner")
return nil
}
type ownerInfo struct {
key types.NamespacedName
gvk schema.GroupVersionKind
}
var oi *ownerInfo

for _, ref := range obj.GetOwnerReferences() {
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
log.Error(err, fmt.Sprintf("map ownee to owner: parse ownee's owner reference group version %q", ref.APIVersion))
return nil
}
refGVK := gv.WithKind(ref.Kind)
if refGVK == ownerGVK && ref.Controller != nil && *ref.Controller {
oi = &ownerInfo{
key: types.NamespacedName{Name: ref.Name},
gvk: ownerGVK,
}
break
}
}
if oi == nil {
return nil
}
return []reconcile.Request{{NamespacedName: oi.key}}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,84 @@ func setUnpackStatusSuccessful(conditions *[]metav1.Condition, message string, g
ObservedGeneration: generation,
})
}

// setValidateStatusFailing sets the validate status condition to failing.
func setValidateStatusFailing(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeValidated,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonValidateFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setValidateStatusSuccess sets the validate status condition to success.
func setValidateStatusSuccess(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeValidated,
Status: metav1.ConditionTrue,
Reason: v1alpha2.ReasonValidateSuccessful,
Message: message,
ObservedGeneration: generation,
})
}

// setInstallStatusFailed sets the installed success to failing.
func setInstallStatusFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonInstallFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setUnpgradeStatusFailed sets the installed success to failing as there is an error while patching
// objects on cluster.
func setUnpgradeStatusFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonUpgradeFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setReconcileStatusFailed sets the installed success to failing as there is an error while reconciling
// existing objects on cluster.
func setReconcileStatusFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonReconcileFailed,
Message: message,
ObservedGeneration: generation,
})
}

// setInstallStatusSuccess sets the installed success to success.
func setInstallStatusSuccess(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionTrue,
Reason: v1alpha2.ReasonInstallSucceeded,
Message: message,
ObservedGeneration: generation,
})
}

// setDynamicWatchFailed sets the installed status to failing with the appropriate reason.
// This status appears when there is an error while fetching the applied objects from cluster
// after the deployer has returned so as to set watches on them.
func setDynamicWatchFailed(conditions *[]metav1.Condition, message string, generation int64) {
apimeta.SetStatusCondition(conditions, metav1.Condition{
Type: v1alpha2.TypeInstalled,
Status: metav1.ConditionFalse,
Reason: v1alpha2.ReasonCreateDynamicWatchFailed,
Message: message,
ObservedGeneration: generation,
})
}
Loading

0 comments on commit fb0fa79

Please sign in to comment.