Skip to content

Commit

Permalink
fix the rebootstrap issue (#55)
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Le <[email protected]>
  • Loading branch information
elgnay authored Nov 15, 2023
1 parent 099c3e6 commit 0d54d24
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 124 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ CSV_VERSION?=0.10.0

# WORK_IMAGE can be set in the env to override calculated value
WORK_TAG?=latest
WORK_IMAGE?=$(IMAGE_REGISTRY)/work:$(WORK_TAG)
export WORK_IMAGE?=$(IMAGE_REGISTRY)/work:$(WORK_TAG)

# REGISTRATION_IMAGE can be set in the env to override calculated value
REGISTRATION_TAG?=latest
REGISTRATION_IMAGE?=$(IMAGE_REGISTRY)/registration:$(REGISTRATION_TAG)
export REGISTRATION_IMAGE?=$(IMAGE_REGISTRY)/registration:$(REGISTRATION_TAG)

# PLACEMENT_IMAGE can be set in the env to override calculated value
PLACEMENT_TAG?=latest
PLACEMENT_IMAGE?=$(IMAGE_REGISTRY)/placement:$(PLACEMENT_TAG)
export PLACEMENT_IMAGE?=$(IMAGE_REGISTRY)/placement:$(PLACEMENT_TAG)

OPERATOR_SDK?=$(PERMANENT_TMP_GOPATH)/bin/operator-sdk
OPERATOR_SDK_VERSION?=v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions pkg/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
FeatureGatesTypeValid = "ValidFeatureGates"
FeatureGatesReasonAllValid = "FeatureGatesAllValid"
FeatureGatesReasonInvalidExisting = "InvalidFeatureGatesExisting"

KlusterletRebootstrapProgressing = "RebootstrapProgressing"
)

// OperatorType represents the type of operator supported by the current controller, value could be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

operatorv1client "open-cluster-management.io/api/client/operator/clientset/versioned/typed/operator/v1"
operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions/operator/v1"
operatorlister "open-cluster-management.io/api/client/operator/listers/operator/v1"
"open-cluster-management.io/registration-operator/pkg/helpers"
Expand Down Expand Up @@ -40,17 +41,20 @@ var BootstrapControllerSyncInterval = 5 * time.Minute
type bootstrapController struct {
kubeClient kubernetes.Interface
klusterletLister operatorlister.KlusterletLister
klusterletClient operatorv1client.KlusterletInterface
secretInformers map[string]coreinformer.SecretInformer
}

// NewBootstrapController returns a bootstrapController
func NewBootstrapController(
kubeClient kubernetes.Interface,
klusterletClient operatorv1client.KlusterletInterface,
klusterletInformer operatorinformer.KlusterletInformer,
secretInformers map[string]coreinformer.SecretInformer,
recorder events.Recorder) factory.Controller {
controller := &bootstrapController{
kubeClient: kubeClient,
klusterletClient: klusterletClient,
klusterletLister: klusterletInformer.Lister(),
secretInformers: secretInformers,
}
Expand Down Expand Up @@ -93,6 +97,18 @@ func (k *bootstrapController) sync(ctx context.Context, controllerContext factor
return nil
}

// handle rebootstrap if the klusterlet is in rebootstrapping state
klusterlet, err := k.klusterletLister.Get(klusterletName)
if err != nil {
return err
}
requeueFunc := func(duration time.Duration) {
controllerContext.Queue().AddAfter(queueKey, duration)
}
if meta.IsStatusConditionTrue(klusterlet.Status.Conditions, helpers.KlusterletRebootstrapProgressing) {
return k.processRebootstrap(ctx, agentNamespace, klusterletName, controllerContext.Recorder(), requeueFunc)
}

bootstrapHubKubeconfigSecret, err := k.secretInformers[helpers.BootstrapHubKubeConfig].Lister().Secrets(agentNamespace).Get(helpers.BootstrapHubKubeConfig)
switch {
case errors.IsNotFound(err):
Expand Down Expand Up @@ -135,7 +151,7 @@ func (k *bootstrapController) sync(ctx context.Context, controllerContext factor
!bytes.Equal(bootstrapKubeconfig.CertificateAuthorityData, hubKubeconfig.CertificateAuthorityData) {
// the bootstrap kubeconfig secret is changed, reload the klusterlet agents
reloadReason := fmt.Sprintf("the bootstrap secret %s/%s is changed", agentNamespace, helpers.BootstrapHubKubeConfig)
return k.reloadAgents(ctx, controllerContext, agentNamespace, klusterletName, reloadReason)
return k.startRebootstrap(ctx, klusterletName, reloadReason, controllerContext.Recorder(), requeueFunc)
}

expired, err := isHubKubeconfigSecretExpired(hubKubeconfigSecret)
Expand All @@ -153,33 +169,70 @@ func (k *bootstrapController) sync(ctx context.Context, controllerContext factor

// the hub kubeconfig secret cert is expired, reload klusterlet to restart bootstrap
reloadReason := fmt.Sprintf("the hub kubeconfig secret %s/%s is expired", agentNamespace, helpers.HubKubeConfig)
return k.reloadAgents(ctx, controllerContext, agentNamespace, klusterletName, reloadReason)
return k.startRebootstrap(ctx, klusterletName, reloadReason, controllerContext.Recorder(), requeueFunc)
}

// reloadAgents reload klusterlet agents by
// 1. make the registration agent re-bootstrap by deleting the current hub kubeconfig secret to
// 2. restart the registration and work agents to reload the new hub ca by deleting the agent deployments
func (k *bootstrapController) reloadAgents(ctx context.Context, ctrlContext factory.SyncContext, namespace, klusterletName, reason string) error {
if err := k.kubeClient.CoreV1().Secrets(namespace).Delete(ctx, helpers.HubKubeConfig, metav1.DeleteOptions{}); err != nil {
func (k *bootstrapController) processRebootstrap(ctx context.Context, agentNamespace, klusterletName string, recorder events.Recorder, requeueFunc func(time.Duration)) error {
deploymentName := fmt.Sprintf("%s-registration-agent", klusterletName)
deployment, err := k.kubeClient.AppsV1().Deployments(agentNamespace).Get(ctx, deploymentName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return k.completeRebootstrap(ctx, agentNamespace, klusterletName, recorder)
}
if err != nil {
return err
}
ctrlContext.Recorder().Eventf("HubKubeconfigSecretDeleted", fmt.Sprintf("the hub kubeconfig secret %s/%s is deleted due to %s",
namespace, helpers.HubKubeConfig, reason))

registrationName := fmt.Sprintf("%s-registration-agent", klusterletName)
if err := k.kubeClient.AppsV1().Deployments(namespace).Delete(ctx, registrationName, metav1.DeleteOptions{}); err != nil {
return err
if deployment.Status.AvailableReplicas == 0 {
return k.completeRebootstrap(ctx, agentNamespace, klusterletName, recorder)
}
ctrlContext.Recorder().Eventf("KlusterletAgentDeploymentDeleted", fmt.Sprintf("the deployment %s/%s is deleted due to %s",
namespace, registrationName, reason))

workName := fmt.Sprintf("%s-work-agent", klusterletName)
if err := k.kubeClient.AppsV1().Deployments(namespace).Delete(ctx, workName, metav1.DeleteOptions{}); err != nil {
// there still is registation agent pod running. Resync in 5 seconds
requeueFunc(5 * time.Second)
return nil
}

func (k *bootstrapController) startRebootstrap(ctx context.Context, klusterletName, message string, recorder events.Recorder, requeueFunc func(duration time.Duration)) error {
condition := metav1.Condition{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionTrue,
Reason: "RebootstrapStarted",
Message: message,
}
_, _, err := helpers.UpdateKlusterletStatus(ctx, k.klusterletClient, klusterletName,
helpers.UpdateKlusterletConditionFn(condition),
)
if err != nil {
return err
}
ctrlContext.Recorder().Eventf("KlusterletAgentDeploymentDeleted", fmt.Sprintf("the deployment %s/%s is deleted due to %s",
namespace, workName, reason))
recorder.Eventf("KlusterletRebootstrap", fmt.Sprintf("The klusterlet %q starts rebootstrapping due to %s",
klusterletName, message))

// requeue and check the rebootstrap progress in 5 seconds
requeueFunc(5 * time.Second)
return nil
}

func (k *bootstrapController) completeRebootstrap(ctx context.Context, agentNamespace, klusterletName string, recorder events.Recorder) error {
// delete the existing hub kubeconfig
if err := k.kubeClient.CoreV1().Secrets(agentNamespace).Delete(ctx, helpers.HubKubeConfig, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return err
}
recorder.Eventf("KlusterletRebootstrap", fmt.Sprintf("Secret %s/%s is deleted", agentNamespace, helpers.HubKubeConfig))

// update the condition of klusterlet
condition := metav1.Condition{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionFalse,
Reason: "RebootstrapCompleted",
Message: fmt.Sprintf("Secret %s/%s is deleted and bootstrap is triggered", agentNamespace, helpers.HubKubeConfig),
}
_, _, err := helpers.UpdateKlusterletStatus(ctx, k.klusterletClient, klusterletName,
helpers.UpdateKlusterletConditionFn(condition),
)
if err != nil {
return err
}
recorder.Eventf("KlusterletRebootstrap", fmt.Sprintf("Rebootstrap of the klusterlet %q is completed", klusterletName))
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"math/big"
"open-cluster-management.io/registration-operator/pkg/helpers"
"testing"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"

fakeoperatorclient "open-cluster-management.io/api/client/operator/clientset/versioned/fake"
operatorinformers "open-cluster-management.io/api/client/operator/informers/externalversions"
operatorapiv1 "open-cluster-management.io/api/operator/v1"
"open-cluster-management.io/registration-operator/pkg/helpers"
testinghelper "open-cluster-management.io/registration-operator/pkg/helpers/testing"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -33,10 +35,12 @@ import (

func TestSync(t *testing.T) {
cases := []struct {
name string
queueKey string
objects []runtime.Object
validateActions func(t *testing.T, actions []clienttesting.Action)
name string
queueKey string
initRebootstrapping bool
objects []runtime.Object
expectedRebootstrapping bool
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "the changed secret is not bootstrap secret",
Expand All @@ -48,18 +52,17 @@ func TestSync(t *testing.T) {
},
},
{
name: "checking the hub kubeconfig secret",
name: "client certificate expired",
queueKey: "test/test",
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.47:6443")),
newHubKubeConfigSecret("test", time.Now().Add(-60*time.Second).UTC()),
newDeployment("test-registration-agent", "test"),
newDeployment("test-work-agent", "test"),
},
expectedRebootstrapping: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelper.AssertDelete(t, actions[0], "secrets", "test", "hub-kubeconfig-secret")
testinghelper.AssertDelete(t, actions[1], "deployments", "test", "test-registration-agent")
testinghelper.AssertDelete(t, actions[2], "deployments", "test", "test-work-agent")
if len(actions) != 0 {
t.Errorf("expected no actions happens, but got %#v", actions)
}
},
},
{
Expand Down Expand Up @@ -88,28 +91,63 @@ func TestSync(t *testing.T) {
{
name: "the bootstrap secret is changed",
queueKey: "test/test",
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.48:6443")),
newHubKubeConfigSecret("test", time.Now().Add(60*time.Second).UTC()),
},
expectedRebootstrapping: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("expected no actions happens, but got %#v", actions)
}
},
},
{
name: "wait for scaling down",
queueKey: "test/test",
initRebootstrapping: true,
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.48:6443")),
newHubKubeConfigSecret("test", time.Now().Add(60*time.Second).UTC()),
newDeploymentWithAvailableReplicas("test-registration-agent", "test", 1),
},
expectedRebootstrapping: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelper.AssertGet(t, actions[0], "apps", "v1", "deployments")
},
},
{
name: "rebootstrap is completed",
queueKey: "test/test",
initRebootstrapping: true,
objects: []runtime.Object{
newSecret("bootstrap-hub-kubeconfig", "test", newKubeConfig("https://10.0.118.48:6443")),
newHubKubeConfigSecret("test", time.Now().Add(60*time.Second).UTC()),
newDeployment("test-registration-agent", "test"),
newDeployment("test-work-agent", "test"),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelper.AssertDelete(t, actions[0], "secrets", "test", "hub-kubeconfig-secret")
testinghelper.AssertDelete(t, actions[1], "deployments", "test", "test-registration-agent")
testinghelper.AssertDelete(t, actions[2], "deployments", "test", "test-work-agent")
testinghelper.AssertDelete(t, actions[1], "secrets", "test", "hub-kubeconfig-secret")
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fakeKubeClient := fakekube.NewSimpleClientset(c.objects...)

fakeOperatorClient := fakeoperatorclient.NewSimpleClientset()
klusterlet := newKlusterlet("test", "test")
if c.initRebootstrapping {
klusterlet.Status.Conditions = []metav1.Condition{
{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionTrue,
},
}
}
fakeOperatorClient := fakeoperatorclient.NewSimpleClientset(klusterlet)
operatorInformers := operatorinformers.NewSharedInformerFactory(fakeOperatorClient, 5*time.Minute)

operatorStore := operatorInformers.Operator().V1().Klusterlets().Informer().GetStore()
if err := operatorStore.Add(newKlusterlet("test", "test")); err != nil {
if err := operatorStore.Add(klusterlet); err != nil {
t.Fatal(err)
}
newOnTermInformer := func(name string) kubeinformers.SharedInformerFactory {
Expand Down Expand Up @@ -150,6 +188,7 @@ func TestSync(t *testing.T) {

controller := &bootstrapController{
kubeClient: fakeKubeClient,
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
secretInformers: secretInformers,
}
Expand All @@ -160,6 +199,15 @@ func TestSync(t *testing.T) {
}

c.validateActions(t, fakeKubeClient.Actions())

klusterlet, err := fakeOperatorClient.OperatorV1().Klusterlets().Get(context.Background(), klusterlet.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Expected no errors, but got %v", err)
}
rebootstrapping := meta.IsStatusConditionTrue(klusterlet.Status.Conditions, helpers.KlusterletRebootstrapProgressing)
if c.expectedRebootstrapping != rebootstrapping {
t.Errorf("Expected rebootstrapping is %v, but got %v", c.expectedRebootstrapping, rebootstrapping)
}
})
}
}
Expand Down Expand Up @@ -310,3 +358,9 @@ func newDeployment(name, namespace string) *appsv1.Deployment {
Spec: appsv1.DeploymentSpec{},
}
}

func newDeploymentWithAvailableReplicas(name, namespace string, availableReplicas int32) *appsv1.Deployment {
deploy := newDeployment(name, namespace)
deploy.Status.AvailableReplicas = availableReplicas
return deploy
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"crypto/sha256"
"fmt"
"k8s.io/client-go/rest"
"strings"
"testing"
"time"

"k8s.io/client-go/rest"

"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -737,8 +738,32 @@ func TestReplica(t *testing.T) {
t.Errorf("Expected non error when sync, %v", err)
}

// should have 3 replicas for clusters with multiple nodes
assertRegistrationDeployment(t, controller.kubeClient.Actions(), "update", "", "cluster1", 3)
assertWorkDeployment(t, controller.kubeClient.Actions(), "update", "cluster1", operatorapiv1.InstallModeDefault, 3)

klusterlet = newKlusterlet("klusterlet", "testns", "cluster1")
klusterlet.Status.Conditions = []metav1.Condition{
{
Type: helpers.KlusterletRebootstrapProgressing,
Status: metav1.ConditionTrue,
},
}
if err := controller.operatorStore.Update(klusterlet); err != nil {
t.Fatal(err)
}

controller.kubeClient.ClearActions()
controller.operatorClient.ClearActions()

err = controller.controller.sync(context.TODO(), syncContext)
if err != nil {
t.Errorf("Expected non error when sync, %v", err)
}

// should have 0 replicas for klusterlet in rebootstrapping state
assertRegistrationDeployment(t, controller.kubeClient.Actions(), "update", "", "cluster1", 0)
assertWorkDeployment(t, controller.kubeClient.Actions(), "update", "cluster1", operatorapiv1.InstallModeDefault, 0)
}

func TestClusterNameChange(t *testing.T) {
Expand Down
Loading

0 comments on commit 0d54d24

Please sign in to comment.