Skip to content

Commit

Permalink
fix cluster deletion in the in-memory API server
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziopandini committed Jun 8, 2023
1 parent f3e3bda commit fa1cd34
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *InMemoryClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// Handle deleted clusters
if !inMemoryCluster.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, inMemoryCluster)
return r.reconcileDelete(ctx, cluster, inMemoryCluster)
}

// Handle non-deleted clusters
Expand Down Expand Up @@ -187,11 +187,19 @@ func (r *InMemoryClusterReconciler) reconcileNormal(_ context.Context, cluster *
return ctrl.Result{}, nil
}

//nolint:unparam // once we implemented this func we will also return errors
func (r *InMemoryClusterReconciler) reconcileDelete(_ context.Context, inMemoryCluster *infrav1.InMemoryCluster) (ctrl.Result, error) {
// TODO: implement
controllerutil.RemoveFinalizer(inMemoryCluster, infrav1.ClusterFinalizer)
func (r *InMemoryClusterReconciler) reconcileDelete(_ context.Context, cluster *clusterv1.Cluster, inMemoryCluster *infrav1.InMemoryCluster) (ctrl.Result, error) {
// Compute the resource group unique name.
resourceGroup := klog.KObj(cluster).String()

// Delete the resource group hosting all the cloud resources belonging the workload cluster;
r.CloudManager.DeleteResourceGroup(resourceGroup)

// Delete the listener for the workload cluster;
if err := r.APIServerMux.DeleteWorkloadClusterListener(resourceGroup); err != nil {
return ctrl.Result{}, err
}

controllerutil.RemoveFinalizer(inMemoryCluster, infrav1.ClusterFinalizer)
return ctrl.Result{}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (r *InMemoryMachineReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// Handle deleted machines
if !inMemoryMachine.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, inMemoryMachine)
return r.reconcileDelete(ctx, cluster, machine, inMemoryMachine)
}

// Handle non-deleted machines
Expand Down Expand Up @@ -324,7 +324,7 @@ func (r *InMemoryMachineReconciler) reconcileNormal(ctx context.Context, cluster
}
if err := cloudClient.Get(ctx, client.ObjectKeyFromObject(etcdPod), etcdPod); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to get etcdPod Pod")
return ctrl.Result{}, errors.Wrapf(err, "failed to get etcd Pod")
}

etcdPod.Labels = map[string]string{
Expand Down Expand Up @@ -468,11 +468,88 @@ func (r *InMemoryMachineReconciler) reconcileNormal(ctx context.Context, cluster
return ctrl.Result{}, nil
}

//nolint:unparam // once we implemented this func we will also return errors
func (r *InMemoryMachineReconciler) reconcileDelete(_ context.Context, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// TODO: implement
controllerutil.RemoveFinalizer(inMemoryMachine, infrav1.MachineFinalizer)
func (r *InMemoryMachineReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine, inMemoryMachine *infrav1.InMemoryMachine) (ctrl.Result, error) {
// Compute the resource group unique name.
// NOTE: We are using reconcilerGroup also as a name for the listener for sake of simplicity.
resourceGroup := klog.KObj(cluster).String()
cloudClient := r.CloudManager.GetResourceGroup(resourceGroup).GetClient()

// Delete VM
cloudMachine := &cloudv1.CloudMachine{
ObjectMeta: metav1.ObjectMeta{
Name: inMemoryMachine.Name,
},
}
if err := cloudClient.Delete(ctx, cloudMachine); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete CloudMachine")
}

// Delete Node
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: inMemoryMachine.Name,
},
}
if err := cloudClient.Delete(ctx, node); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete Node")
}

if util.IsControlPlaneMachine(machine) {
controllerManagerPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: fmt.Sprintf("kube-controller-manager-%s", inMemoryMachine.Name),
},
}
if err := cloudClient.Delete(ctx, controllerManagerPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to controller manager Pod")
}

schedulerPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: fmt.Sprintf("kube-scheduler-%s", inMemoryMachine.Name),
},
}
if err := cloudClient.Delete(ctx, schedulerPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to scheduler Pod")
}

apiServer := fmt.Sprintf("kube-apiserver-%s", inMemoryMachine.Name)
apiServerPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: apiServer,
},
}
if err := cloudClient.Delete(ctx, apiServerPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to apiServer Pod")
}
if err := r.APIServerMux.DeleteAPIServer(resourceGroup, apiServer); err != nil {
return ctrl.Result{}, err
}

// TODO: if all the API server are gone, cleanup all the k8s objects from the resource group.
// note: it is not possible to delete the resource group, because cloud resources should be preserved.
// given that, in order to implement this it is required to find a way to identify all the k8s resources (might be via gvk);
// also, deletion must happen suddently, without respecting finalizers or owner references links.

etcdMember := fmt.Sprintf("etcd-%s", inMemoryMachine.Name)
etcdPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceSystem,
Name: etcdMember,
},
}
if err := cloudClient.Delete(ctx, etcdPod); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to etcd Pod")
}
if err := r.APIServerMux.DeleteEtcdMember(resourceGroup, etcdMember); err != nil {
return ctrl.Result{}, err
}
}

controllerutil.RemoveFinalizer(inMemoryMachine, infrav1.MachineFinalizer)
return ctrl.Result{}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions test/infrastructure/inmemory/internal/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,29 @@ func NewAPIServerHandler(manager cmanager.Manager, log logr.Logger, resolver Res
ws.Route(ws.GET("/api/v1/{resource}/{name}").To(apiServer.apiV1Get))
ws.Route(ws.PUT("/api/v1/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Update))
ws.Route(ws.PATCH("/api/v1/{resource}/{name}").Consumes(string(types.MergePatchType), string(types.StrategicMergePatchType)).To(apiServer.apiV1Patch))
ws.Route(ws.DELETE("/api/v1/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Delete))
ws.Route(ws.DELETE("/api/v1/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf, runtime.ContentTypeJSON).To(apiServer.apiV1Delete))

ws.Route(ws.POST("/apis/{group}/{version}/{resource}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Create))
ws.Route(ws.GET("/apis/{group}/{version}/{resource}").To(apiServer.apiV1List))
ws.Route(ws.GET("/apis/{group}/{version}/{resource}/{name}").To(apiServer.apiV1Get))
ws.Route(ws.PUT("/apis/{group}/{version}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Update))
ws.Route(ws.PATCH("/apis/{group}/{version}/{resource}/{name}").Consumes(string(types.MergePatchType), string(types.StrategicMergePatchType)).To(apiServer.apiV1Patch))
ws.Route(ws.DELETE("/apis/{group}/{version}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Delete))
ws.Route(ws.DELETE("/apis/{group}/{version}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf, runtime.ContentTypeJSON).To(apiServer.apiV1Delete))

// CRUD endpoints (namespaced objects)
ws.Route(ws.POST("/api/v1/namespaces/{namespace}/{resource}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Create))
ws.Route(ws.GET("/api/v1/namespaces/{namespace}/{resource}").To(apiServer.apiV1List))
ws.Route(ws.GET("/api/v1/namespaces/{namespace}/{resource}/{name}").To(apiServer.apiV1Get))
ws.Route(ws.PUT("/api/v1/namespaces/{namespace}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Update))
ws.Route(ws.PATCH("/api/v1/namespaces/{namespace}/{resource}/{name}").Consumes(string(types.MergePatchType), string(types.StrategicMergePatchType)).To(apiServer.apiV1Patch))
ws.Route(ws.DELETE("/api/v1/namespaces/{namespace}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Delete))
ws.Route(ws.DELETE("/api/v1/namespaces/{namespace}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf, runtime.ContentTypeJSON).To(apiServer.apiV1Delete))

ws.Route(ws.POST("/apis/{group}/{version}/namespaces/{namespace}/{resource}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Create))
ws.Route(ws.GET("/apis/{group}/{version}/namespaces/{namespace}/{resource}").To(apiServer.apiV1List))
ws.Route(ws.GET("/apis/{group}/{version}/namespaces/{namespace}/{resource}/{name}").To(apiServer.apiV1Get))
ws.Route(ws.PUT("/apis/{group}/{version}/namespaces/{namespace}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Update))
ws.Route(ws.PATCH("/apis/{group}/{version}/namespaces/{namespace}/{resource}/{name}").Consumes(string(types.MergePatchType), string(types.StrategicMergePatchType)).To(apiServer.apiV1Patch))
ws.Route(ws.DELETE("/apis/{group}/{version}/namespaces/{namespace}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf).To(apiServer.apiV1Delete))
ws.Route(ws.DELETE("/apis/{group}/{version}/namespaces/{namespace}/{resource}/{name}").Consumes(runtime.ContentTypeProtobuf, runtime.ContentTypeJSON).To(apiServer.apiV1Delete))

// Port forward endpoints
ws.Route(ws.GET("/api/v1/namespaces/{namespace}/pods/{name}/portforward").To(apiServer.apiV1PortForward))
Expand Down
63 changes: 62 additions & 1 deletion test/infrastructure/inmemory/internal/server/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,28 @@ func (m *WorkloadClustersMux) AddAPIServer(wclName, podName string, caCert *x509
return nil
}

// DeleteAPIServer removes an API server instance from the WorkloadClusterListener.
func (m *WorkloadClustersMux) DeleteAPIServer(wclName, podName string) error {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return errors.Errorf("workloadClusterListener with name %s must be initialized before removing an APIserver", wclName)
}
wcl.apiServers.Delete(podName)
m.log.Info("APIServer instance removed from the workloadClusterListener", "listenerName", wclName, "address", wcl.Address(), "podName", podName)

if wcl.apiServers.Len() < 1 && wcl.listener != nil {
if err := wcl.listener.Close(); err != nil {
return errors.Wrapf(err, "failed to stop WorkloadClusterListener %s, %s", wclName, wcl.HostPort())
}
wcl.listener = nil
m.log.Info("WorkloadClusterListener stopped because there are no APIServer left", "listenerName", wclName, "address", wcl.Address())
}
return nil
}

// HasAPIServer returns true if the workload cluster already has an apiserver with podName.
func (m *WorkloadClustersMux) HasAPIServer(wclName, podName string) bool {
m.lock.RLock()
Expand All @@ -371,7 +393,7 @@ func (m *WorkloadClustersMux) AddEtcdMember(wclName, podName string, caCert *x50

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return errors.Errorf("workloadClusterListener with name %s must be initialized before adding an APIserver", wclName)
return errors.Errorf("workloadClusterListener with name %s must be initialized before adding an etcd member", wclName)
}
wcl.etcdMembers.Insert(podName)
m.log.Info("Etcd member added to WorkloadClusterListener", "listenerName", wclName, "address", wcl.Address(), "podName", podName)
Expand Down Expand Up @@ -406,6 +428,22 @@ func (m *WorkloadClustersMux) HasEtcdMember(wclName, podName string) bool {
return wcl.etcdMembers.Has(podName)
}

// DeleteEtcdMember removes an etcd Member from the WorkloadClusterListener.
func (m *WorkloadClustersMux) DeleteEtcdMember(wclName, podName string) error {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return errors.Errorf("workloadClusterListener with name %s must be initialized before removing an etcd member", wclName)
}
wcl.etcdMembers.Delete(podName)
delete(wcl.etcdServingCertificates, podName)
m.log.Info("Etcd member removed from WorkloadClusterListener", "listenerName", wclName, "address", wcl.Address(), "podName", podName)

return nil
}

// ListListeners implements api.DebugInfoProvider.
func (m *WorkloadClustersMux) ListListeners() map[string]string {
m.lock.RLock()
Expand All @@ -418,6 +456,29 @@ func (m *WorkloadClustersMux) ListListeners() map[string]string {
return ret
}

// DeleteWorkloadClusterListener deletes a WorkloadClusterListener.
func (m *WorkloadClustersMux) DeleteWorkloadClusterListener(wclName string) error {
m.lock.Lock()
defer m.lock.Unlock()

wcl, ok := m.workloadClusterListeners[wclName]
if !ok {
return nil
}

if wcl.listener != nil {
if err := wcl.listener.Close(); err != nil {
return errors.Wrapf(err, "failed to stop WorkloadClusterListener %s, %s", wclName, wcl.HostPort())
}
}

delete(m.workloadClusterListeners, wclName)
delete(m.workloadClusterNameByHost, wcl.HostPort())

m.log.Info("Workload cluster listener deleted", "listenerName", wclName, "address", wcl.Address())
return nil
}

// Shutdown shuts down the workload cluster mux.
func (m *WorkloadClustersMux) Shutdown(ctx context.Context) error {
m.lock.Lock()
Expand Down
55 changes: 55 additions & 0 deletions test/infrastructure/inmemory/internal/server/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,61 @@ func init() {
ctrl.SetLogger(klog.Background())
}

func TestMux(t *testing.T) {
g := NewWithT(t)

manager := cmanager.New(scheme)

wcl := "workload-cluster"
host := "127.0.0.1" //nolint:goconst
wcmux := NewWorkloadClustersMux(manager, host)

listener, err := wcmux.InitWorkloadClusterListener(wcl)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(listener.Host()).To(Equal(host))
g.Expect(listener.Port()).ToNot(BeZero())

caCert, caKey, err := newCertificateAuthority()
g.Expect(err).ToNot(HaveOccurred())

etcdCert, etcdKey, err := newCertificateAuthority()
g.Expect(err).ToNot(HaveOccurred())

apiServerPod1 := "apiserver1"
err = wcmux.AddAPIServer(wcl, apiServerPod1, caCert, caKey)
g.Expect(err).ToNot(HaveOccurred())

etcdPodMember1 := "etcd1"
err = wcmux.AddEtcdMember(wcl, etcdPodMember1, etcdCert, etcdKey)
g.Expect(err).ToNot(HaveOccurred())

apiServerPod2 := "apiserver2"
err = wcmux.AddAPIServer(wcl, apiServerPod2, caCert, caKey)
g.Expect(err).ToNot(HaveOccurred())

etcdPodMember2 := "etcd2"
err = wcmux.AddEtcdMember(wcl, etcdPodMember2, etcdCert, etcdKey)
g.Expect(err).ToNot(HaveOccurred())

err = wcmux.DeleteAPIServer(wcl, apiServerPod2)
g.Expect(err).ToNot(HaveOccurred())

err = wcmux.DeleteEtcdMember(wcl, etcdPodMember2)
g.Expect(err).ToNot(HaveOccurred())

err = wcmux.DeleteAPIServer(wcl, apiServerPod1)
g.Expect(err).ToNot(HaveOccurred())

err = wcmux.DeleteEtcdMember(wcl, etcdPodMember1)
g.Expect(err).ToNot(HaveOccurred())

err = wcmux.DeleteWorkloadClusterListener(wcl)
g.Expect(err).ToNot(HaveOccurred())

err = wcmux.Shutdown(ctx)
g.Expect(err).ToNot(HaveOccurred())
}

func TestAPI_corev1_CRUD(t *testing.T) {
g := NewWithT(t)

Expand Down

0 comments on commit fa1cd34

Please sign in to comment.