From 1d359da5a20f88a460886903b982ac132b87fe1c Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Wed, 27 Mar 2024 07:00:34 +0000 Subject: [PATCH 01/12] adding k8snode name label to px pods --- pkg/constants/metadata.go | 3 +++ pkg/controller/storagecluster/controller_test.go | 7 +++++++ pkg/controller/storagecluster/storagecluster.go | 1 + 3 files changed, 11 insertions(+) diff --git a/pkg/constants/metadata.go b/pkg/constants/metadata.go index e00083086..9e8fd16e1 100644 --- a/pkg/constants/metadata.go +++ b/pkg/constants/metadata.go @@ -52,6 +52,9 @@ const ( OperatorLabelManagedByKey = OperatorPrefix + "/managed-by" // OperatorLabelManagedByValue indicates that the object is managed by portworx. OperatorLabelManagedByValue = "portworx" + // K8sNodeRunningPxPod hold the value of the kubernetes node on whcih the portworx pod is running + // Used for creating a Node PDB + K8sNodeRunningPxPod = "k8s-node-running-px-pod" ) const ( diff --git a/pkg/controller/storagecluster/controller_test.go b/pkg/controller/storagecluster/controller_test.go index 30186217e..b04dee5ff 100644 --- a/pkg/controller/storagecluster/controller_test.go +++ b/pkg/controller/storagecluster/controller_test.go @@ -1877,8 +1877,10 @@ func TestStoragePodGetsScheduled(t *testing.T) { require.Len(t, podControl.Templates, 2) podTemplate1 := expectedPodTemplate.DeepCopy() podTemplate1.Spec.NodeName = "k8s-node-1" + podTemplate1.Labels["k8s-node-running-px-pod"] = "k8s-node-1" podTemplate2 := expectedPodTemplate.DeepCopy() podTemplate2.Spec.NodeName = "k8s-node-2" + podTemplate2.Labels["k8s-node-running-px-pod"] = "k8s-node-2" expectedPodTemplates := []v1.PodTemplateSpec{ *podTemplate1, *podTemplate2, } @@ -1995,8 +1997,10 @@ func TestStoragePodGetsScheduledK8s1_24(t *testing.T) { require.Len(t, podControl.Templates, 2) podTemplate1 := expectedPodTemplate.DeepCopy() podTemplate1.Spec.NodeName = "k8s-node-1" + podTemplate1.Labels["k8s-node-running-px-pod"] = "k8s-node-1" podTemplate2 := expectedPodTemplate.DeepCopy() podTemplate2.Spec.NodeName = "k8s-node-2" + podTemplate2.Labels["k8s-node-running-px-pod"] = "k8s-node-2" expectedPodTemplates := []v1.PodTemplateSpec{ *podTemplate1, *podTemplate2, } @@ -2342,10 +2346,13 @@ func TestStoragePodGetsScheduledWithCustomNodeSpecs(t *testing.T) { } podTemplate1 := expectedPodTemplate.DeepCopy() podTemplate1.Spec.NodeName = "k8s-node-1" + podTemplate1.Labels["k8s-node-running-px-pod"] = "k8s-node-1" podTemplate2 := expectedPodTemplate.DeepCopy() podTemplate2.Spec.NodeName = "k8s-node-2" + podTemplate2.Labels["k8s-node-running-px-pod"] = "k8s-node-2" podTemplate3 := expectedPodTemplate.DeepCopy() podTemplate3.Spec.NodeName = "k8s-node-3" + podTemplate3.Labels["k8s-node-running-px-pod"] = "k8s-node-3" expectedPodTemplates := []v1.PodTemplateSpec{ *podTemplate1, *podTemplate2, *podTemplate3, } diff --git a/pkg/controller/storagecluster/storagecluster.go b/pkg/controller/storagecluster/storagecluster.go index b9755f068..d0c3c1cef 100644 --- a/pkg/controller/storagecluster/storagecluster.go +++ b/pkg/controller/storagecluster/storagecluster.go @@ -1393,6 +1393,7 @@ func (c *Controller) CreatePodTemplate( podSpec.NodeName = node.Name labels := c.StorageClusterSelectorLabels(cluster) labels[constants.OperatorLabelManagedByKey] = constants.OperatorLabelManagedByValue + labels[constants.K8sNodeRunningPxPod] = node.Name newTemplate := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Namespace: cluster.Namespace, From 9ee32f0b62286527386b4a7154dec6c517fe0693 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Wed, 27 Mar 2024 08:30:37 +0000 Subject: [PATCH 02/12] correcting go fmt error --- pkg/constants/metadata.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/constants/metadata.go b/pkg/constants/metadata.go index 9e8fd16e1..5e8aba908 100644 --- a/pkg/constants/metadata.go +++ b/pkg/constants/metadata.go @@ -53,8 +53,8 @@ const ( // OperatorLabelManagedByValue indicates that the object is managed by portworx. OperatorLabelManagedByValue = "portworx" // K8sNodeRunningPxPod hold the value of the kubernetes node on whcih the portworx pod is running - // Used for creating a Node PDB - K8sNodeRunningPxPod = "k8s-node-running-px-pod" + // Used for creating a Node PDB + K8sNodeRunningPxPod = "k8s-node-running-px-pod" ) const ( From 75bbb5fdbac5a82de752f5ba8f8fdbf56ef4cf58 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Fri, 29 Mar 2024 05:20:59 +0000 Subject: [PATCH 03/12] making requested changes --- pkg/constants/metadata.go | 4 ++-- pkg/controller/storagecluster/controller_test.go | 14 +++++++------- pkg/controller/storagecluster/storagecluster.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/constants/metadata.go b/pkg/constants/metadata.go index 5e8aba908..d4871759f 100644 --- a/pkg/constants/metadata.go +++ b/pkg/constants/metadata.go @@ -52,9 +52,9 @@ const ( OperatorLabelManagedByKey = OperatorPrefix + "/managed-by" // OperatorLabelManagedByValue indicates that the object is managed by portworx. OperatorLabelManagedByValue = "portworx" - // K8sNodeRunningPxPod hold the value of the kubernetes node on whcih the portworx pod is running + // OperatorLabelNodeNameKey holds the value of the kubernetes node on whcih the portworx pod is running // Used for creating a Node PDB - K8sNodeRunningPxPod = "k8s-node-running-px-pod" + OperatorLabelNodeNameKey = OperatorPrefix + "/node-name" ) const ( diff --git a/pkg/controller/storagecluster/controller_test.go b/pkg/controller/storagecluster/controller_test.go index b04dee5ff..0f847f3ec 100644 --- a/pkg/controller/storagecluster/controller_test.go +++ b/pkg/controller/storagecluster/controller_test.go @@ -1877,10 +1877,10 @@ func TestStoragePodGetsScheduled(t *testing.T) { require.Len(t, podControl.Templates, 2) podTemplate1 := expectedPodTemplate.DeepCopy() podTemplate1.Spec.NodeName = "k8s-node-1" - podTemplate1.Labels["k8s-node-running-px-pod"] = "k8s-node-1" + podTemplate1.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-1" podTemplate2 := expectedPodTemplate.DeepCopy() podTemplate2.Spec.NodeName = "k8s-node-2" - podTemplate2.Labels["k8s-node-running-px-pod"] = "k8s-node-2" + podTemplate2.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-2" expectedPodTemplates := []v1.PodTemplateSpec{ *podTemplate1, *podTemplate2, } @@ -1997,10 +1997,10 @@ func TestStoragePodGetsScheduledK8s1_24(t *testing.T) { require.Len(t, podControl.Templates, 2) podTemplate1 := expectedPodTemplate.DeepCopy() podTemplate1.Spec.NodeName = "k8s-node-1" - podTemplate1.Labels["k8s-node-running-px-pod"] = "k8s-node-1" + podTemplate1.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-1" podTemplate2 := expectedPodTemplate.DeepCopy() podTemplate2.Spec.NodeName = "k8s-node-2" - podTemplate2.Labels["k8s-node-running-px-pod"] = "k8s-node-2" + podTemplate2.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-2" expectedPodTemplates := []v1.PodTemplateSpec{ *podTemplate1, *podTemplate2, } @@ -2346,13 +2346,13 @@ func TestStoragePodGetsScheduledWithCustomNodeSpecs(t *testing.T) { } podTemplate1 := expectedPodTemplate.DeepCopy() podTemplate1.Spec.NodeName = "k8s-node-1" - podTemplate1.Labels["k8s-node-running-px-pod"] = "k8s-node-1" + podTemplate1.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-1" podTemplate2 := expectedPodTemplate.DeepCopy() podTemplate2.Spec.NodeName = "k8s-node-2" - podTemplate2.Labels["k8s-node-running-px-pod"] = "k8s-node-2" + podTemplate2.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-2" podTemplate3 := expectedPodTemplate.DeepCopy() podTemplate3.Spec.NodeName = "k8s-node-3" - podTemplate3.Labels["k8s-node-running-px-pod"] = "k8s-node-3" + podTemplate3.Labels[constants.OperatorLabelNodeNameKey] = "k8s-node-3" expectedPodTemplates := []v1.PodTemplateSpec{ *podTemplate1, *podTemplate2, *podTemplate3, } diff --git a/pkg/controller/storagecluster/storagecluster.go b/pkg/controller/storagecluster/storagecluster.go index d0c3c1cef..f414f8293 100644 --- a/pkg/controller/storagecluster/storagecluster.go +++ b/pkg/controller/storagecluster/storagecluster.go @@ -1393,7 +1393,7 @@ func (c *Controller) CreatePodTemplate( podSpec.NodeName = node.Name labels := c.StorageClusterSelectorLabels(cluster) labels[constants.OperatorLabelManagedByKey] = constants.OperatorLabelManagedByValue - labels[constants.K8sNodeRunningPxPod] = node.Name + labels[constants.OperatorLabelNodeNameKey] = node.Name newTemplate := v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Namespace: cluster.Namespace, From 2fca6f16c3daec40b92695e5ac9962ea634cde3c Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Tue, 2 Apr 2024 15:08:22 +0000 Subject: [PATCH 04/12] Creating node PDB --- .../portworx/component/disruption_budget.go | 57 +++++++++++- drivers/storage/portworx/components_test.go | 89 +++++++++++++++++++ drivers/storage/portworx/util/util.go | 63 +++++++++++++ 3 files changed, 207 insertions(+), 2 deletions(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index a61803e52..c3b75f5b0 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -34,6 +34,11 @@ const ( DefaultKVDBClusterSize = 3 ) +var ( + // ParallelUpgradePDBVersion is the portworx version from which parallel upgrades is supported + ParallelUpgradePDBVersion, _ = version.NewVersion("3.1.2") +) + type disruptionBudget struct { k8sClient client.Client sdkConn *grpc.ClientConn @@ -77,9 +82,18 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { if err := c.createKVDBPodDisruptionBudget(cluster, ownerRef); err != nil { return err } - if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil { - return err + // Create node PDB only if parallel upgrade is supported + clusterPXver := pxutil.GetPortworxVersion(cluster) + if clusterPXver.GreaterThanOrEqual(ParallelUpgradePDBVersion) { + if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef); err != nil { + return err + } + } else { + if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil { + return err + } } + return nil } @@ -178,6 +192,45 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( return err } +func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( + cluster *corev1.StorageCluster, + ownerRef *metav1.OwnerReference, +) error { + + var err error + c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace) + if err != nil { + return err + } + nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, c.sdkConn, c.k8sClient) + if err != nil { + return err + } + for _, node := range nodesNeedingPDB { + minAvailable := intstr.FromInt(1) + PDBName := "px-storage-" + node + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: PDBName, + Namespace: cluster.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + constants.LabelKeyClusterName: cluster.Name, + constants.OperatorLabelNodeNameKey: node, + }, + }, + }, + } + err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef) + } + return err + +} + func (c *disruptionBudget) createKVDBPodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, diff --git a/drivers/storage/portworx/components_test.go b/drivers/storage/portworx/components_test.go index e9bcd48fa..32ae5b833 100644 --- a/drivers/storage/portworx/components_test.go +++ b/drivers/storage/portworx/components_test.go @@ -13409,6 +13409,95 @@ func TestDisablePodDisruptionBudgets(t *testing.T) { require.True(t, errors.IsNotFound(err)) } +func TestNodePodDisruptionBudget(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockNodeServer := mock.NewMockOpenStorageNodeServer(mockCtrl) + sdkServerIP := "127.0.0.1" + sdkServerPort := 21883 + mockSdk := mock.NewSdkServer(mock.SdkServers{ + Node: mockNodeServer, + }) + err := mockSdk.StartOnAddress(sdkServerIP, strconv.Itoa(sdkServerPort)) + require.NoError(t, err) + defer mockSdk.Stop() + + testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test") + defer testutil.RestoreEtcHosts(t) + expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ + Nodes: []*osdapi.StorageNode{ + {SchedulerNodeName: "node1"}, + {SchedulerNodeName: "node2"}, + {NonQuorumMember: true, SchedulerNodeName: "node3"}, + }, + } + mockNodeServer.EXPECT(). + EnumerateWithFilters(gomock.Any(), gomock.Any()). + Return(expectedNodeEnumerateResp, nil). + AnyTimes() + + cluster := &corev1.StorageCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "px-cluster", + Namespace: "kube-test", + }, + Spec: corev1.StorageClusterSpec{ + Image: "portworx/oci-monitor:3.1.2", + }, + Status: corev1.StorageClusterStatus{ + Phase: string(corev1.ClusterStateRunning), + }, + } + fakeK8sNodes := &v1.NodeList{Items: []v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + }, + } + + k8sClient := testutil.FakeK8sClient(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pxutil.PortworxServiceName, + Namespace: cluster.Namespace, + }, + Spec: v1.ServiceSpec{ + ClusterIP: sdkServerIP, + Ports: []v1.ServicePort{ + { + Name: pxutil.PortworxSDKPortName, + Port: int32(sdkServerPort), + }, + }, + }, + }, fakeK8sNodes) + + coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) + component.DeregisterAllComponents() + component.RegisterDisruptionBudgetComponent() + + driver := portworx{} + err = driver.Init(k8sClient, runtime.NewScheme(), record.NewFakeRecorder(0)) + require.NoError(t, err) + err = driver.PreInstall(cluster) + require.NoError(t, err) + + storagePDB := &policyv1.PodDisruptionBudget{} + err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node1", cluster.Namespace) + require.NoError(t, err) + require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue()) + require.Equal(t, "node1", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey]) + + err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node2", cluster.Namespace) + require.NoError(t, err) + require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue()) + require.Equal(t, "node2", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey]) + + err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node3", cluster.Namespace) + fmt.Println(err) + require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found",component.StoragePodDisruptionBudgetName)) + +} + func TestSCC(t *testing.T) { coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) reregisterComponents() diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index 07c470cec..e4c3a5b9d 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -1528,3 +1528,66 @@ func ShouldUseClusterDomain(node *api.StorageNode) (bool, error) { } return true, nil } + +// Get list of storagenodes that are a part of the current cluster that need a node PDB +func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) ([]string, error) { + // Get the list of storage nodes + nodeClient := api.NewOpenStorageNodeClient(sdkConn) + ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient) + if err != nil { + return nil, err + } + + nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( + ctx, + &api.SdkNodeEnumerateWithFiltersRequest{}, + ) + if err != nil { + return nil, fmt.Errorf("failed to enumerate nodes: %v", err) + } + + // Get the list of k8s nodes that are part of the current cluster + k8sNodesStoragePodCouldRun := make(map[string]bool) + k8sNodeList := &v1.NodeList{} + err = k8sClient.List(context.TODO(), k8sNodeList) + if err != nil { + return nil, err + } + for _, node := range k8sNodeList.Items { + shouldRun, shouldContinueRunning, err := k8sutil.CheckPredicatesForStoragePod(&node, cluster, nil) + if err != nil { + return nil, err + } + if shouldRun || shouldContinueRunning { + k8sNodesStoragePodCouldRun[node.Name] = true + } + } + + // Create a list of nodes that are part of quorum to create node PDB for them + nodesNeedingPDB := make([]string, 0) + for _, node := range nodeEnumerateResponse.Nodes { + // Do not add node if its not part of quorum or is decomissioned + if node.Status == api.Status_STATUS_DECOMMISSION || node.NonQuorumMember { + logrus.Debugf("Node %s is not a quorum member or is decomissioned, skipping", node.Id) + continue + } + + if node.SchedulerNodeName == "" { + k8sNode, err := coreops.Instance().SearchNodeByAddresses( + []string{node.DataIp, node.MgmtIp, node.Hostname}, + ) + if err != nil { + // In Metro-DR setup, this could be expected. + logrus.Infof("Unable to find kubernetes node name for nodeID %v: %v", node.Id, err) + continue + } + node.SchedulerNodeName = k8sNode.Name + } + + if _, ok := k8sNodesStoragePodCouldRun[node.SchedulerNodeName]; ok { + nodesNeedingPDB = append(nodesNeedingPDB, node.SchedulerNodeName) + } + } + + return nodesNeedingPDB, nil +} From 6e81239f21a18df405e94e72fe1b389acbba6057 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Tue, 2 Apr 2024 15:18:33 +0000 Subject: [PATCH 05/12] go fmt error --- drivers/storage/portworx/components_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/storage/portworx/components_test.go b/drivers/storage/portworx/components_test.go index 32ae5b833..7381f016a 100644 --- a/drivers/storage/portworx/components_test.go +++ b/drivers/storage/portworx/components_test.go @@ -13494,7 +13494,7 @@ func TestNodePodDisruptionBudget(t *testing.T) { err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node3", cluster.Namespace) fmt.Println(err) - require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found",component.StoragePodDisruptionBudgetName)) + require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found", component.StoragePodDisruptionBudgetName)) } From b92cb3203f4812a4aab9b56ea78080fe76e3b880 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Wed, 3 Apr 2024 16:32:10 +0000 Subject: [PATCH 06/12] Checking node version instead of cluster --- .../portworx/component/disruption_budget.go | 33 ++++------ drivers/storage/portworx/components_test.go | 64 ++++++++++++------- drivers/storage/portworx/util/util.go | 34 ++++++++++ 3 files changed, 87 insertions(+), 44 deletions(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index c3b75f5b0..a7f199507 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -34,11 +34,6 @@ const ( DefaultKVDBClusterSize = 3 ) -var ( - // ParallelUpgradePDBVersion is the portworx version from which parallel upgrades is supported - ParallelUpgradePDBVersion, _ = version.NewVersion("3.1.2") -) - type disruptionBudget struct { k8sClient client.Client sdkConn *grpc.ClientConn @@ -83,13 +78,17 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { return err } // Create node PDB only if parallel upgrade is supported - clusterPXver := pxutil.GetPortworxVersion(cluster) - if clusterPXver.GreaterThanOrEqual(ParallelUpgradePDBVersion) { - if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef); err != nil { + var err error + c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace) + if err != nil { + return err + } + if pxutil.ClusterSupportsParallelUpgrade(cluster, c.sdkConn, c.k8sClient) { + if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil { return err } } else { - if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil { + if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil { return err } } @@ -120,6 +119,7 @@ func (c *disruptionBudget) MarkDeleted() {} func (c *disruptionBudget) createPortworxPodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, + sdkConn *grpc.ClientConn, ) error { userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster) if err != nil { @@ -128,12 +128,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( } var minAvailable int - c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace) - if err != nil { - return err - } - storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient) + storageNodesCount, err := pxutil.CountStorageNodes(cluster, sdkConn, c.k8sClient) if err != nil { c.closeSdkConn() return err @@ -195,14 +191,9 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, + sdkConn *grpc.ClientConn, ) error { - - var err error - c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace) - if err != nil { - return err - } - nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, c.sdkConn, c.k8sClient) + nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, sdkConn, c.k8sClient) if err != nil { return err } diff --git a/drivers/storage/portworx/components_test.go b/drivers/storage/portworx/components_test.go index 7381f016a..b31e19cc3 100644 --- a/drivers/storage/portworx/components_test.go +++ b/drivers/storage/portworx/components_test.go @@ -12508,7 +12508,7 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ Nodes: []*osdapi.StorageNode{ - {SchedulerNodeName: "node1"}, + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, }, } @@ -12597,8 +12597,8 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { // TestCase: Do not create storage PDB if total nodes with storage is less than 3 expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, {Pools: []*osdapi.StoragePool{}}, {}, } @@ -12630,11 +12630,11 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { // Also, ignore the annotation if the value is an invalid integer cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] = "invalid" expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, {Pools: []*osdapi.StoragePool{}}, {}, - {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"}, + {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, } err = driver.PreInstall(cluster) @@ -12657,11 +12657,11 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { // Also, ignore the annotation if the value is an invalid integer cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] = "still_invalid" expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, - {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"}, - {Pools: []*osdapi.StoragePool{{ID: 4}}, SchedulerNodeName: "node4"}, - {Pools: []*osdapi.StoragePool{{ID: 5}}, SchedulerNodeName: "node5"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 4}}, SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 5}}, SchedulerNodeName: "node5", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, {Pools: []*osdapi.StoragePool{}}, {}, } @@ -13283,7 +13283,7 @@ func TestPodDisruptionBudgetWithErrors(t *testing.T) { mockNodeServer.EXPECT(). EnumerateWithFilters(gomock.Any(), gomock.Any()). Return(nil, fmt.Errorf("NodeEnumerate error")). - Times(1) + AnyTimes() cluster.Spec.Security.Enabled = false err = driver.PreInstall(cluster) @@ -13315,9 +13315,9 @@ func TestDisablePodDisruptionBudgets(t *testing.T) { expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ Nodes: []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, - {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, }, } mockNodeServer.EXPECT(). @@ -13425,11 +13425,13 @@ func TestNodePodDisruptionBudget(t *testing.T) { testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test") defer testutil.RestoreEtcHosts(t) + + // PDB should not be created for non quorum members expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ Nodes: []*osdapi.StorageNode{ - {SchedulerNodeName: "node1"}, - {SchedulerNodeName: "node2"}, - {NonQuorumMember: true, SchedulerNodeName: "node3"}, + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: true, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, }, } mockNodeServer.EXPECT(). @@ -13481,21 +13483,37 @@ func TestNodePodDisruptionBudget(t *testing.T) { err = driver.PreInstall(cluster) require.NoError(t, err) + // PDB is created for 2 storage nodes and kvdb + pdbList := &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 3) + storagePDB := &policyv1.PodDisruptionBudget{} err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node1", cluster.Namespace) require.NoError(t, err) require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue()) require.Equal(t, "node1", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey]) - err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node2", cluster.Namespace) - require.NoError(t, err) - require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue()) - require.Equal(t, "node2", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey]) - err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node3", cluster.Namespace) fmt.Println(err) require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found", component.StoragePodDisruptionBudgetName)) + // Testcase : PDB per node should not be created for any node even if 1 node is lesser than 3.1.2 + // Use cluster wide PDB in this case + expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + } + err = driver.PreInstall(cluster) + require.NoError(t, err) + + storagePDB = &policyv1.PodDisruptionBudget{} + err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName, cluster.Namespace) + require.NoError(t, err) + require.Equal(t, 2, storagePDB.Spec.MinAvailable.IntValue()) + } func TestSCC(t *testing.T) { diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index e4c3a5b9d..6058e241e 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -294,6 +294,9 @@ var ( // ConfigMapNameRegex regex of configMap. ConfigMapNameRegex = regexp.MustCompile("[^a-zA-Z0-9]+") + + // ParallelUpgradePDBVersion is the portworx version from which parallel upgrades is supported + ParallelUpgradePDBVersion, _ = version.NewVersion("3.1.2") ) func getStrippedClusterName(cluster *corev1.StorageCluster) string { @@ -1591,3 +1594,34 @@ func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k return nodesNeedingPDB, nil } + +func ClusterSupportsParallelUpgrade(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) bool { + nodeClient := api.NewOpenStorageNodeClient(sdkConn) + ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient) + if err != nil { + return false + } + nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( + ctx, + &api.SdkNodeEnumerateWithFiltersRequest{}, + ) + if err != nil { + return false + } + + for _, node := range nodeEnumerateResponse.Nodes { + if node.Status == api.Status_STATUS_DECOMMISSION { + continue + } + v := node.NodeLabels[NodeLabelPortworxVersion] + nodeVersion, err := version.NewVersion(v) + if err != nil { + logrus.Warnf("Failed to parse node version %s for node %s: %v", v, node.Id, err) + return false + } + if nodeVersion.LessThan(ParallelUpgradePDBVersion) { + return false + } + } + return true +} From 6c836b6008b06a93b576c9db279b0d496666f972 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Fri, 5 Apr 2024 15:15:15 +0000 Subject: [PATCH 07/12] making requested changes Conflicts: drivers/storage/portworx/component/disruption_budget.go --- .../portworx/component/disruption_budget.go | 38 ++++++++--- drivers/storage/portworx/components_test.go | 14 +++-- drivers/storage/portworx/util/util.go | 63 +++---------------- 3 files changed, 45 insertions(+), 70 deletions(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index a7f199507..aa76c9379 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -1,11 +1,13 @@ package component import ( + "context" "fmt" "math" "strconv" "github.com/hashicorp/go-version" + "github.com/libopenstorage/openstorage/api" "github.com/sirupsen/logrus" "google.golang.org/grpc" policyv1 "k8s.io/api/policy/v1" @@ -83,12 +85,27 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { if err != nil { return err } - if pxutil.ClusterSupportsParallelUpgrade(cluster, c.sdkConn, c.k8sClient) { - if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil { + + // Get list of portworx storage nodes + nodeClient := api.NewOpenStorageNodeClient(c.sdkConn) + ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient) + if err != nil { + return err + } + nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( + ctx, + &api.SdkNodeEnumerateWithFiltersRequest{}, + ) + if err != nil { + return fmt.Errorf("failed to enumerate nodes: %v", err) + } + + if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) { + if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { return err } } else { - if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil { + if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { return err } } @@ -119,7 +136,7 @@ func (c *disruptionBudget) MarkDeleted() {} func (c *disruptionBudget) createPortworxPodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, - sdkConn *grpc.ClientConn, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, ) error { userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster) if err != nil { @@ -129,7 +146,7 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( var minAvailable int - storageNodesCount, err := pxutil.CountStorageNodes(cluster, sdkConn, c.k8sClient) + storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse) if err != nil { c.closeSdkConn() return err @@ -191,15 +208,15 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, - sdkConn *grpc.ClientConn, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, ) error { - nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, sdkConn, c.k8sClient) + nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse) if err != nil { return err } for _, node := range nodesNeedingPDB { minAvailable := intstr.FromInt(1) - PDBName := "px-storage-" + node + PDBName := "px-" + node pdb := &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: PDBName, @@ -217,6 +234,10 @@ func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( }, } err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef) + if err != nil { + logrus.Warnf("Failed to create PDB for node %s: %v", node, err) + break + } } return err @@ -230,7 +251,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget( if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal { return nil } - clusterSize := kvdbClusterSize(cluster) minAvailable := intstr.FromInt(clusterSize - 1) pdb := &policyv1.PodDisruptionBudget{ diff --git a/drivers/storage/portworx/components_test.go b/drivers/storage/portworx/components_test.go index b31e19cc3..c5281a090 100644 --- a/drivers/storage/portworx/components_test.go +++ b/drivers/storage/portworx/components_test.go @@ -13426,12 +13426,14 @@ func TestNodePodDisruptionBudget(t *testing.T) { testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test") defer testutil.RestoreEtcHosts(t) - // PDB should not be created for non quorum members + // PDB should not be created for non quorum members, nodes not part of current cluster and nodes in decommissioned state expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ Nodes: []*osdapi.StorageNode{ {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, {NonQuorumMember: true, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: false, Status: osdapi.Status_STATUS_DECOMMISSION}, }, } mockNodeServer.EXPECT(). @@ -13454,6 +13456,7 @@ func TestNodePodDisruptionBudget(t *testing.T) { fakeK8sNodes := &v1.NodeList{Items: []v1.Node{ {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node3"}}, }, } @@ -13490,14 +13493,15 @@ func TestNodePodDisruptionBudget(t *testing.T) { require.Len(t, pdbList.Items, 3) storagePDB := &policyv1.PodDisruptionBudget{} - err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node1", cluster.Namespace) + err = testutil.Get(k8sClient, storagePDB, "px-node1", cluster.Namespace) require.NoError(t, err) require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue()) require.Equal(t, "node1", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey]) - err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node3", cluster.Namespace) - fmt.Println(err) - require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found", component.StoragePodDisruptionBudgetName)) + err = testutil.Get(k8sClient, storagePDB, "px-node3", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + err = testutil.Get(k8sClient, storagePDB, "px-node4", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) // Testcase : PDB per node should not be created for any node even if 1 node is lesser than 3.1.2 // Use cluster wide PDB in this case diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index 6058e241e..aba40f0aa 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -1114,6 +1114,7 @@ func CountStorageNodes( cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, ) (int, error) { nodeClient := api.NewOpenStorageNodeClient(sdkConn) ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient, false) @@ -1127,14 +1128,6 @@ func CountStorageNodes( clusterDomains, err := clusterDomainClient.Enumerate(ctx, &api.SdkClusterDomainsEnumerateRequest{}) isDRSetup = err == nil && len(clusterDomains.ClusterDomainNames) > 1 - nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( - ctx, - &api.SdkNodeEnumerateWithFiltersRequest{}, - ) - if err != nil { - return -1, fmt.Errorf("failed to enumerate nodes: %v", err) - } - // Use the quorum member flag from the node enumerate response if all the nodes are upgraded to the // newer version. The Enumerate response could be coming from any node and we want to make sure that // we are not talking to an old node when enumerating. @@ -1533,37 +1526,19 @@ func ShouldUseClusterDomain(node *api.StorageNode) (bool, error) { } // Get list of storagenodes that are a part of the current cluster that need a node PDB -func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) ([]string, error) { - // Get the list of storage nodes - nodeClient := api.NewOpenStorageNodeClient(sdkConn) - ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient) - if err != nil { - return nil, err - } - - nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( - ctx, - &api.SdkNodeEnumerateWithFiltersRequest{}, - ) - if err != nil { - return nil, fmt.Errorf("failed to enumerate nodes: %v", err) - } +func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) { // Get the list of k8s nodes that are part of the current cluster k8sNodesStoragePodCouldRun := make(map[string]bool) k8sNodeList := &v1.NodeList{} - err = k8sClient.List(context.TODO(), k8sNodeList) + err := k8sClient.List(context.TODO(), k8sNodeList) if err != nil { return nil, err } + + // Get list of kubernetes nodes that are a part of the current cluster for _, node := range k8sNodeList.Items { - shouldRun, shouldContinueRunning, err := k8sutil.CheckPredicatesForStoragePod(&node, cluster, nil) - if err != nil { - return nil, err - } - if shouldRun || shouldContinueRunning { - k8sNodesStoragePodCouldRun[node.Name] = true - } + k8sNodesStoragePodCouldRun[node.Name] = true } // Create a list of nodes that are part of quorum to create node PDB for them @@ -1575,18 +1550,6 @@ func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k continue } - if node.SchedulerNodeName == "" { - k8sNode, err := coreops.Instance().SearchNodeByAddresses( - []string{node.DataIp, node.MgmtIp, node.Hostname}, - ) - if err != nil { - // In Metro-DR setup, this could be expected. - logrus.Infof("Unable to find kubernetes node name for nodeID %v: %v", node.Id, err) - continue - } - node.SchedulerNodeName = k8sNode.Name - } - if _, ok := k8sNodesStoragePodCouldRun[node.SchedulerNodeName]; ok { nodesNeedingPDB = append(nodesNeedingPDB, node.SchedulerNodeName) } @@ -1595,19 +1558,7 @@ func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k return nodesNeedingPDB, nil } -func ClusterSupportsParallelUpgrade(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) bool { - nodeClient := api.NewOpenStorageNodeClient(sdkConn) - ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient) - if err != nil { - return false - } - nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( - ctx, - &api.SdkNodeEnumerateWithFiltersRequest{}, - ) - if err != nil { - return false - } +func ClusterSupportsParallelUpgrade(nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) bool { for _, node := range nodeEnumerateResponse.Nodes { if node.Status == api.Status_STATUS_DECOMMISSION { From f5027ae33af7a38122523147154d83a185abfa11 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Mon, 15 Apr 2024 05:52:08 +0000 Subject: [PATCH 08/12] aggregating errors --- drivers/storage/portworx/component/disruption_budget.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index aa76c9379..8ddcfb063 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -13,6 +13,7 @@ import ( policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" @@ -214,6 +215,7 @@ func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( if err != nil { return err } + errors := []error{} for _, node := range nodesNeedingPDB { minAvailable := intstr.FromInt(1) PDBName := "px-" + node @@ -236,10 +238,10 @@ func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef) if err != nil { logrus.Warnf("Failed to create PDB for node %s: %v", node, err) - break + errors = append(errors, err) } } - return err + return utilerrors.NewAggregate(errors) } From e4b3d8b1499b137ddcaa3b78c49f50579229118e Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Wed, 24 Apr 2024 14:20:12 +0000 Subject: [PATCH 09/12] adding logic to delete node PDB --- .../portworx/component/disruption_budget.go | 30 +++++ drivers/storage/portworx/components_test.go | 115 +++++++++++++++++- drivers/storage/portworx/util/util.go | 34 ++++++ 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index 8ddcfb063..8235726a7 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -105,6 +105,9 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { return err } + if err := c.deletePortworxNodePodDisruptionBudget(cluster,ownerRef, nodeEnumerateResponse); err != nil { + return err + } } else { if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { return err @@ -245,6 +248,33 @@ func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( } +// Delete node pod disruption budget when kubertetes is not part of cluster or portworx does not run on it +func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget( + cluster *corev1.StorageCluster, + ownerRef *metav1.OwnerReference, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, +) error { + nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse) + if err != nil { + return err + } + errors := []error{} + + + for _, node := range nodesToDeletePDB { + PDBName := "px-" + node + err = k8sutil.DeletePodDisruptionBudget( + c.k8sClient, PDBName, + cluster.Namespace,*ownerRef, + ) + if err != nil { + logrus.Warnf("Failed to delete PDB for node %s: %v", node, err) + errors = append(errors, err) + } + } + return utilerrors.NewAggregate(errors) +} + func (c *disruptionBudget) createKVDBPodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, diff --git a/drivers/storage/portworx/components_test.go b/drivers/storage/portworx/components_test.go index c5281a090..106501e35 100644 --- a/drivers/storage/portworx/components_test.go +++ b/drivers/storage/portworx/components_test.go @@ -13409,7 +13409,7 @@ func TestDisablePodDisruptionBudgets(t *testing.T) { require.True(t, errors.IsNotFound(err)) } -func TestNodePodDisruptionBudget(t *testing.T) { +func TestCreateNodePodDisruptionBudget(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -13520,6 +13520,119 @@ func TestNodePodDisruptionBudget(t *testing.T) { } +func TestDeleteNodePodDisruptionBudget(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockNodeServer := mock.NewMockOpenStorageNodeServer(mockCtrl) + sdkServerIP := "127.0.0.1" + sdkServerPort := 21883 + mockSdk := mock.NewSdkServer(mock.SdkServers{ + Node: mockNodeServer, + }) + err := mockSdk.StartOnAddress(sdkServerIP, strconv.Itoa(sdkServerPort)) + require.NoError(t, err) + defer mockSdk.Stop() + + testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test") + defer testutil.RestoreEtcHosts(t) + + expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ + Nodes: []*osdapi.StorageNode{ + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: true, SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: false, Status: osdapi.Status_STATUS_DECOMMISSION}, + }, + } + mockNodeServer.EXPECT(). + EnumerateWithFilters(gomock.Any(), gomock.Any()). + Return(expectedNodeEnumerateResp, nil). + AnyTimes() + + cluster := &corev1.StorageCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "px-cluster", + Namespace: "kube-test", + }, + Spec: corev1.StorageClusterSpec{ + Image: "portworx/oci-monitor:3.1.2", + }, + Status: corev1.StorageClusterStatus{ + Phase: string(corev1.ClusterStateRunning), + }, + } + fakeK8sNodes := &v1.NodeList{Items: []v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node3"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node4"}}, + }, + } + + k8sClient := testutil.FakeK8sClient(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pxutil.PortworxServiceName, + Namespace: cluster.Namespace, + }, + Spec: v1.ServiceSpec{ + ClusterIP: sdkServerIP, + Ports: []v1.ServicePort{ + { + Name: pxutil.PortworxSDKPortName, + Port: int32(sdkServerPort), + }, + }, + }, + }, fakeK8sNodes) + + coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) + component.DeregisterAllComponents() + component.RegisterDisruptionBudgetComponent() + + driver := portworx{} + err = driver.Init(k8sClient, runtime.NewScheme(), record.NewFakeRecorder(0)) + require.NoError(t, err) + err = driver.PreInstall(cluster) + require.NoError(t, err) + + pdbList := &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 4) + + // Testcase: Removing kubernetes node2 from the cluster should delete node PDB + err = testutil.Delete(k8sClient, &fakeK8sNodes.Items[1]) + require.NoError(t, err) + + err = driver.PreInstall(cluster) + require.NoError(t, err) + + pdbList = &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 3) + + err = testutil.Get(k8sClient, &policyv1.PodDisruptionBudget{}, "px-node2", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + + // Testcase: Making node 3 non quorum member should delete node PDB + expectedNodeEnumerateResp.Nodes[2].NonQuorumMember = true + + err = driver.PreInstall(cluster) + require.NoError(t, err) + + pdbList = &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 2) + + err = testutil.Get(k8sClient, &policyv1.PodDisruptionBudget{}, "px-node3", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + +} + func TestSCC(t *testing.T) { coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) reregisterComponents() diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index aba40f0aa..bd9ee14cf 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -1558,6 +1558,40 @@ func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNode return nodesNeedingPDB, nil } +// List of nodes that have an existing pdb but are no longer in k8s cluster or not a portworx storage node +func NodesToDeletePDB (k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) { + // nodeCounts map is used to find the elements that are uncommon between list of k8s nodes in cluster + // and list of portworx storage nodes. Used to find nodes where PDB needs to be deleted + nodeCounts := make(map[string]int) + // Get the list of k8s nodes that are part of the current cluster and increase count of each node + k8sNodeList := &v1.NodeList{} + err := k8sClient.List(context.TODO(), k8sNodeList) + if err != nil { + return nil, err + } + for _, node := range k8sNodeList.Items { + nodeCounts[node.Name]++ + } + + // Get list of storage nodes that are part of quorum and increment count of each node + nodesToDeletePDB := make([]string, 0) + for _, node := range nodeEnumerateResponse.Nodes { + if node.Status == api.Status_STATUS_DECOMMISSION || node.NonQuorumMember { + continue + } + nodeCounts[node.SchedulerNodeName]++ + } + + // Nodes with count 1 might have a node PDB but should not, so delete the PDB for such nodes + for node, count := range nodeCounts { + if count == 1 { + nodesToDeletePDB = append(nodesToDeletePDB, node) + } + } + return nodesToDeletePDB, nil + +} + func ClusterSupportsParallelUpgrade(nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) bool { for _, node := range nodeEnumerateResponse.Nodes { From 0b6cd891018294e1f0dc3a49526e0fb9cab5ebd0 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Thu, 2 May 2024 06:32:09 +0000 Subject: [PATCH 10/12] correcting go fmt errors --- drivers/storage/portworx/component/disruption_budget.go | 5 ++--- drivers/storage/portworx/util/util.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index 8235726a7..d0816fe16 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -105,7 +105,7 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { return err } - if err := c.deletePortworxNodePodDisruptionBudget(cluster,ownerRef, nodeEnumerateResponse); err != nil { + if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { return err } } else { @@ -260,12 +260,11 @@ func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget( } errors := []error{} - for _, node := range nodesToDeletePDB { PDBName := "px-" + node err = k8sutil.DeletePodDisruptionBudget( c.k8sClient, PDBName, - cluster.Namespace,*ownerRef, + cluster.Namespace, *ownerRef, ) if err != nil { logrus.Warnf("Failed to delete PDB for node %s: %v", node, err) diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index bd9ee14cf..77d5bc58b 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -1559,7 +1559,7 @@ func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNode } // List of nodes that have an existing pdb but are no longer in k8s cluster or not a portworx storage node -func NodesToDeletePDB (k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) { +func NodesToDeletePDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) { // nodeCounts map is used to find the elements that are uncommon between list of k8s nodes in cluster // and list of portworx storage nodes. Used to find nodes where PDB needs to be deleted nodeCounts := make(map[string]int) @@ -1589,7 +1589,7 @@ func NodesToDeletePDB (k8sClient client.Client, nodeEnumerateResponse *api.SdkNo } } return nodesToDeletePDB, nil - + } func ClusterSupportsParallelUpgrade(nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) bool { From 7c12e56086cfb13e80d9179f9cd2441d72ff6a32 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Mon, 13 May 2024 05:44:36 +0000 Subject: [PATCH 11/12] made requested changes --- .../portworx/component/disruption_budget.go | 16 ++++++++++---- drivers/storage/portworx/util/util.go | 21 +++++-------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index d0816fe16..d209ef695 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -102,10 +102,16 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { } if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) { - if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { + // Get the list of k8s nodes that are part of the current cluster + k8sNodeList := &v1.NodeList{} + err = c.k8sClient.List(context.TODO(), k8sNodeList) + if err != nil { + return err + } + if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil { return err } - if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { + if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil { return err } } else { @@ -213,8 +219,9 @@ func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, + k8sNodeList *v1.NodeList, ) error { - nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse) + nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList) if err != nil { return err } @@ -253,8 +260,9 @@ func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, + k8sNodeList *v1.NodeList, ) error { - nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse) + nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList) if err != nil { return err } diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index 77d5bc58b..40af534b2 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -1526,17 +1526,10 @@ func ShouldUseClusterDomain(node *api.StorageNode) (bool, error) { } // Get list of storagenodes that are a part of the current cluster that need a node PDB -func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) { - - // Get the list of k8s nodes that are part of the current cluster - k8sNodesStoragePodCouldRun := make(map[string]bool) - k8sNodeList := &v1.NodeList{} - err := k8sClient.List(context.TODO(), k8sNodeList) - if err != nil { - return nil, err - } +func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, k8sNodeList *v1.NodeList) ([]string, error) { // Get list of kubernetes nodes that are a part of the current cluster + k8sNodesStoragePodCouldRun := make(map[string]bool) for _, node := range k8sNodeList.Items { k8sNodesStoragePodCouldRun[node.Name] = true } @@ -1559,16 +1552,12 @@ func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNode } // List of nodes that have an existing pdb but are no longer in k8s cluster or not a portworx storage node -func NodesToDeletePDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) { +func NodesToDeletePDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, k8sNodeList *v1.NodeList) ([]string, error) { // nodeCounts map is used to find the elements that are uncommon between list of k8s nodes in cluster // and list of portworx storage nodes. Used to find nodes where PDB needs to be deleted nodeCounts := make(map[string]int) - // Get the list of k8s nodes that are part of the current cluster and increase count of each node - k8sNodeList := &v1.NodeList{} - err := k8sClient.List(context.TODO(), k8sNodeList) - if err != nil { - return nil, err - } + + // Increase count of each node for _, node := range k8sNodeList.Items { nodeCounts[node.Name]++ } From 0d4a92cd5c6b5630f1db84cb1712860651f58da7 Mon Sep 17 00:00:00 2001 From: svijaykumar-px Date: Mon, 27 May 2024 05:48:21 +0000 Subject: [PATCH 12/12] correcting after merge conflict --- drivers/storage/portworx/component/disruption_budget.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index d209ef695..c861c353a 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -89,7 +89,7 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { // Get list of portworx storage nodes nodeClient := api.NewOpenStorageNodeClient(c.sdkConn) - ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient) + ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient, false) if err != nil { return err }