Skip to content

Commit

Permalink
Merge pull request #6608 from ykakarap/runtimeruntime-sdk_lifecycle-h…
Browse files Browse the repository at this point in the history
…ooks_before-hooks

✨  RuntimeSDK: BeforeClusterCreate, BeforeClusterUpgrade implementation
  • Loading branch information
k8s-ci-robot authored Jun 14, 2022
2 parents 9bb071e + abb8f5f commit 6b5afa3
Show file tree
Hide file tree
Showing 15 changed files with 768 additions and 26 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/condition_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,8 @@ const (
// TopologyReconciledMachineDeploymentsUpgradePendingReason (Severity=Info) documents reconciliation of a Cluster topology
// not yet completed because at least one of the MachineDeployments is not yet updated to match the desired topology spec.
TopologyReconciledMachineDeploymentsUpgradePendingReason = "MachineDeploymentsUpgradePending"

// TopologyReconciledHookBlockingReason (Severity=Info) documents reconciliation of a Cluster topology
// not yet completed because at least one of the lifecycle hooks is blocking.
TopologyReconciledHookBlockingReason = "LifecycleHookBlocking"
)
4 changes: 4 additions & 0 deletions controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
clustertopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/cluster"
machinedeploymenttopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/machinedeployment"
machinesettopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/machineset"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
)

// Following types provides access to reconcilers implemented in internal/controllers, thus
Expand Down Expand Up @@ -133,6 +134,8 @@ type ClusterTopologyReconciler struct {
// race conditions caused by an outdated cache.
APIReader client.Reader

RuntimeClient runtimeclient.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

Expand All @@ -145,6 +148,7 @@ func (r *ClusterTopologyReconciler) SetupWithManager(ctx context.Context, mgr ct
return (&clustertopologycontroller.Reconciler{
Client: r.Client,
APIReader: r.APIReader,
RuntimeClient: r.RuntimeClient,
UnstructuredCachingClient: r.UnstructuredCachingClient,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
Expand Down
44 changes: 44 additions & 0 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cluster
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,14 +31,19 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/external"
runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/patches"
"sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope"
"sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/structuredmerge"
runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/patch"
Expand All @@ -59,6 +65,8 @@ type Reconciler struct {
// race conditions caused by an outdated cache.
APIReader client.Reader

RuntimeClient runtimeclient.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

Expand Down Expand Up @@ -207,6 +215,17 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope.Scope) (ctrl.Result
return ctrl.Result{}, errors.Wrap(err, "error reading current state of the Cluster topology")
}

// The cluster topology is yet to be created. Call the BeforeClusterCreate hook before proceeding.
if feature.Gates.Enabled(feature.RuntimeSDK) {
res, err := r.callBeforeClusterCreateHook(ctx, s)
if err != nil {
return reconcile.Result{}, err
}
if !res.IsZero() {
return res, nil
}
}

// Setup watches for InfrastructureCluster and ControlPlane CRs when they exist.
if err := r.setupDynamicWatches(ctx, s); err != nil {
return ctrl.Result{}, errors.Wrap(err, "error creating dynamic watch")
Expand All @@ -223,6 +242,12 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope.Scope) (ctrl.Result
return ctrl.Result{}, errors.Wrap(err, "error reconciling the Cluster topology")
}

// requeueAfter will not be 0 if any of the runtime hooks returns a blocking response.
requeueAfter := s.HookResponseTracker.AggregateRetryAfter()
if requeueAfter != 0 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

return ctrl.Result{}, nil
}

Expand All @@ -247,6 +272,25 @@ func (r *Reconciler) setupDynamicWatches(ctx context.Context, s *scope.Scope) er
return nil
}

func (r *Reconciler) callBeforeClusterCreateHook(ctx context.Context, s *scope.Scope) (reconcile.Result, error) {
// If the cluster objects (InfraCluster, ControlPlane, etc) are not yet created we are in the creation phase.
// Call the BeforeClusterCreate hook before proceeding.
if s.Current.Cluster.Spec.InfrastructureRef == nil && s.Current.Cluster.Spec.ControlPlaneRef == nil {
hookRequest := &runtimehooksv1.BeforeClusterCreateRequest{
Cluster: *s.Current.Cluster,
}
hookResponse := &runtimehooksv1.BeforeClusterCreateResponse{}
if err := r.RuntimeClient.CallAllExtensions(ctx, runtimehooksv1.BeforeClusterCreate, s.Current.Cluster, hookRequest, hookResponse); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "error calling the %s hook", runtimecatalog.HookName(runtimehooksv1.BeforeClusterCreate))
}
s.HookResponseTracker.Add(runtimehooksv1.BeforeClusterCreate, hookResponse)
if hookResponse.RetryAfterSeconds != 0 {
return ctrl.Result{RequeueAfter: time.Duration(hookResponse.RetryAfterSeconds) * time.Second}, nil
}
}
return ctrl.Result{}, nil
}

// clusterClassToCluster is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
// for Cluster to update when its own ClusterClass gets updated.
func (r *Reconciler) clusterClassToCluster(o client.Object) []ctrl.Request {
Expand Down
95 changes: 95 additions & 0 deletions internal/controllers/topology/cluster/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
utilfeature "k8s.io/component-base/featuregate/testing"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/contract"
"sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope"
runtimecatalog "sigs.k8s.io/cluster-api/internal/runtime/catalog"
fakeruntimeclient "sigs.k8s.io/cluster-api/internal/runtime/client/fake"
"sigs.k8s.io/cluster-api/internal/test/builder"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -436,6 +442,95 @@ func TestClusterReconciler_deleteClusterClass(t *testing.T) {
g.Expect(env.Delete(ctx, clusterClass)).NotTo(Succeed())
}

func TestReconciler_callBeforeClusterCreateHook(t *testing.T) {
catalog := runtimecatalog.New()
_ = runtimehooksv1.AddToCatalog(catalog)
gvh, err := catalog.GroupVersionHook(runtimehooksv1.BeforeClusterCreate)
if err != nil {
panic(err)
}

blockingResponse := &runtimehooksv1.BeforeClusterCreateResponse{
CommonRetryResponse: runtimehooksv1.CommonRetryResponse{
CommonResponse: runtimehooksv1.CommonResponse{
Status: runtimehooksv1.ResponseStatusSuccess,
},
RetryAfterSeconds: int32(10),
},
}
nonBlockingResponse := &runtimehooksv1.BeforeClusterCreateResponse{
CommonRetryResponse: runtimehooksv1.CommonRetryResponse{
CommonResponse: runtimehooksv1.CommonResponse{
Status: runtimehooksv1.ResponseStatusSuccess,
},
RetryAfterSeconds: int32(0),
},
}
failingResponse := &runtimehooksv1.BeforeClusterCreateResponse{
CommonRetryResponse: runtimehooksv1.CommonRetryResponse{
CommonResponse: runtimehooksv1.CommonResponse{
Status: runtimehooksv1.ResponseStatusFailure,
},
},
}

tests := []struct {
name string
hookResponse *runtimehooksv1.BeforeClusterCreateResponse
wantResult reconcile.Result
wantErr bool
}{
{
name: "should return a requeue response when the BeforeClusterCreate hook is blocking",
hookResponse: blockingResponse,
wantResult: ctrl.Result{RequeueAfter: time.Duration(10) * time.Second},
wantErr: false,
},
{
name: "should return an empty response when the BeforeClusterCreate hook is not blocking",
hookResponse: nonBlockingResponse,
wantResult: ctrl.Result{},
wantErr: false,
},
{
name: "should error when the BeforeClusterCreate hook returns a failure response",
hookResponse: failingResponse,
wantResult: ctrl.Result{},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

runtimeClient := fakeruntimeclient.NewRuntimeClientBuilder().
WithCatalog(catalog).
WithCallAllExtensionResponses(map[runtimecatalog.GroupVersionHook]runtimehooksv1.ResponseObject{
gvh: tt.hookResponse,
}).
Build()

r := &Reconciler{
RuntimeClient: runtimeClient,
}
s := &scope.Scope{
Current: &scope.ClusterState{
Cluster: &clusterv1.Cluster{},
},
HookResponseTracker: scope.NewHookResponseTracker(),
}
res, err := r.callBeforeClusterCreateHook(ctx, s)
if tt.wantErr {
g.Expect(err).NotTo(BeNil())
} else {
g.Expect(err).To(BeNil())
g.Expect(res).To(Equal(tt.wantResult))
}
})
}
}

// setupTestEnvForIntegrationTests builds and then creates in the envtest API server all objects required at init time for each of the
// integration tests in this file. This includes:
// - a first clusterClass with all the related templates
Expand Down
15 changes: 15 additions & 0 deletions internal/controllers/topology/cluster/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ func (r *Reconciler) reconcileTopologyReconciledCondition(s *scope.Scope, cluste
return nil
}

// If any of the lifecycle hooks are blocking any part of the reconciliation then topology
// is not considered as fully reconciled.
if s.HookResponseTracker.AggregateRetryAfter() != 0 {
conditions.Set(
cluster,
conditions.FalseCondition(
clusterv1.TopologyReconciledCondition,
clusterv1.TopologyReconciledHookBlockingReason,
clusterv1.ConditionSeverityInfo,
s.HookResponseTracker.AggregateMessage(),
),
)
return nil
}

// If either the Control Plane or any of the MachineDeployments are still pending to pick up the new version (generally
// happens when upgrading the cluster) then the topology is not considered as fully reconciled.
if s.UpgradeTracker.ControlPlane.PendingUpgrade || s.UpgradeTracker.MachineDeployments.PendingUpgrade() {
Expand Down
29 changes: 29 additions & 0 deletions internal/controllers/topology/cluster/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
runtimehooksv1 "sigs.k8s.io/cluster-api/exp/runtime/hooks/api/v1alpha1"
"sigs.k8s.io/cluster-api/internal/controllers/topology/cluster/scope"
"sigs.k8s.io/cluster-api/internal/test/builder"
"sigs.k8s.io/cluster-api/util/conditions"
Expand All @@ -47,6 +48,24 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
wantConditionReason: clusterv1.TopologyReconcileFailedReason,
wantErr: false,
},
{
name: "should set the condition to false if the there is a blocking hook",
reconcileErr: nil,
cluster: &clusterv1.Cluster{},
s: &scope.Scope{
HookResponseTracker: func() *scope.HookResponseTracker {
hrt := scope.NewHookResponseTracker()
hrt.Add(runtimehooksv1.BeforeClusterUpgrade, &runtimehooksv1.BeforeClusterUpgradeResponse{
CommonRetryResponse: runtimehooksv1.CommonRetryResponse{
RetryAfterSeconds: int32(10),
},
})
return hrt
}(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledHookBlockingReason,
},
{
name: "should set the condition to false if new version is not picked up because control plane is provisioning",
reconcileErr: nil,
Expand All @@ -71,6 +90,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.IsProvisioning = true
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason,
Expand Down Expand Up @@ -100,6 +120,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.IsUpgrading = true
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason,
Expand Down Expand Up @@ -129,6 +150,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.IsScaling = true
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason,
Expand Down Expand Up @@ -170,6 +192,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.PendingUpgrade = true
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledControlPlaneUpgradePendingReason,
Expand Down Expand Up @@ -213,6 +236,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.MachineDeployments.MarkPendingUpgrade("md0-abc123")
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledMachineDeploymentsUpgradePendingReason,
Expand Down Expand Up @@ -256,6 +280,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.MachineDeployments.MarkPendingUpgrade("md0-abc123")
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledMachineDeploymentsUpgradePendingReason,
Expand Down Expand Up @@ -285,6 +310,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.IsUpgrading = true
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionTrue,
},
Expand Down Expand Up @@ -313,6 +339,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.IsScaling = true
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionTrue,
},
Expand Down Expand Up @@ -367,6 +394,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.MachineDeployments.MarkPendingUpgrade("md1-abc123")
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionFalse,
wantConditionReason: clusterv1.TopologyReconciledMachineDeploymentsUpgradePendingReason,
Expand Down Expand Up @@ -421,6 +449,7 @@ func TestReconcileTopologyReconciledCondition(t *testing.T) {
ut.ControlPlane.PendingUpgrade = false
return ut
}(),
HookResponseTracker: scope.NewHookResponseTracker(),
},
wantConditionStatus: corev1.ConditionTrue,
},
Expand Down
Loading

0 comments on commit 6b5afa3

Please sign in to comment.