Skip to content

Commit

Permalink
[Cherry-pick into feature]: Cherry-pick of all tickets related to Sma…
Browse files Browse the repository at this point in the history
…rt and parallel upgrades merged into master (#1578)

* adding k8snode name label to px pods

* correcting go fmt error

* making requested changes

* Creating node PDB

* go fmt error

* Checking node version instead of cluster

* making requested changes

Conflicts:
	drivers/storage/portworx/component/disruption_budget.go

* aggregating errors

* adding logic to delete node PDB

* correcting go fmt errors

* made requested changes

* correcting after merge conflict
  • Loading branch information
svijaykumar-px authored Jun 17, 2024
1 parent 06436af commit ed8810d
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 30 deletions.
117 changes: 110 additions & 7 deletions drivers/storage/portworx/component/disruption_budget.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package component

import (
"context"
"fmt"
"math"
"strconv"

"github.com/hashicorp/go-version"
"github.com/libopenstorage/openstorage/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -77,9 +80,46 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error {
if err := c.createKVDBPodDisruptionBudget(cluster, ownerRef); err != nil {
return err
}
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil {
// Create node PDB only if parallel upgrade is supported
var err error
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

// Get list of portworx storage nodes
nodeClient := api.NewOpenStorageNodeClient(c.sdkConn)
ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient, false)
if err != nil {
return err
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return fmt.Errorf("failed to enumerate nodes: %v", err)
}

if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) {
// Get the list of k8s nodes that are part of the current cluster
k8sNodeList := &v1.NodeList{}
err = c.k8sClient.List(context.TODO(), k8sNodeList)
if err != nil {
return err
}
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
} else {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil {
return err
}
}

return nil
}

Expand All @@ -106,6 +146,7 @@ func (c *disruptionBudget) MarkDeleted() {}
func (c *disruptionBudget) createPortworxPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) error {
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
Expand All @@ -114,12 +155,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
}

var minAvailable int
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient)
storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse)
if err != nil {
c.closeSdkConn()
return err
Expand Down Expand Up @@ -178,6 +215,73 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
return err
}

func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}
for _, node := range nodesNeedingPDB {
minAvailable := intstr.FromInt(1)
PDBName := "px-" + node
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: PDBName,
Namespace: cluster.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelKeyClusterName: cluster.Name,
constants.OperatorLabelNodeNameKey: node,
},
},
},
}
err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef)
if err != nil {
logrus.Warnf("Failed to create PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)

}

// Delete node pod disruption budget when kubertetes is not part of cluster or portworx does not run on it
func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}

for _, node := range nodesToDeletePDB {
PDBName := "px-" + node
err = k8sutil.DeletePodDisruptionBudget(
c.k8sClient, PDBName,
cluster.Namespace, *ownerRef,
)
if err != nil {
logrus.Warnf("Failed to delete PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)
}

func (c *disruptionBudget) createKVDBPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
Expand All @@ -186,7 +290,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget(
if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal {
return nil
}

clusterSize := kvdbClusterSize(cluster)
minAvailable := intstr.FromInt(clusterSize - 1)
pdb := &policyv1.PodDisruptionBudget{
Expand Down
Loading

0 comments on commit ed8810d

Please sign in to comment.