Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick into feature]: Cherry-pick of all tickets related to Smart and parallel upgrades merged into master #1578

Merged
merged 12 commits into from
Jun 17, 2024
117 changes: 110 additions & 7 deletions drivers/storage/portworx/component/disruption_budget.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package component

import (
"context"
"fmt"
"math"
"strconv"

"github.com/hashicorp/go-version"
"github.com/libopenstorage/openstorage/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -77,9 +80,46 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error {
if err := c.createKVDBPodDisruptionBudget(cluster, ownerRef); err != nil {
return err
}
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil {
// Create node PDB only if parallel upgrade is supported
var err error
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

// Get list of portworx storage nodes
nodeClient := api.NewOpenStorageNodeClient(c.sdkConn)
ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient, false)
if err != nil {
return err
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return fmt.Errorf("failed to enumerate nodes: %v", err)
}

if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) {
// Get the list of k8s nodes that are part of the current cluster
k8sNodeList := &v1.NodeList{}
err = c.k8sClient.List(context.TODO(), k8sNodeList)
if err != nil {
return err
}
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
} else {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil {
return err
}
}

return nil
}

Expand All @@ -106,6 +146,7 @@ func (c *disruptionBudget) MarkDeleted() {}
func (c *disruptionBudget) createPortworxPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) error {
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
Expand All @@ -114,12 +155,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
}

var minAvailable int
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient)
storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse)
if err != nil {
c.closeSdkConn()
return err
Expand Down Expand Up @@ -178,6 +215,73 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
return err
}

func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}
for _, node := range nodesNeedingPDB {
minAvailable := intstr.FromInt(1)
PDBName := "px-" + node
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: PDBName,
Namespace: cluster.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelKeyClusterName: cluster.Name,
constants.OperatorLabelNodeNameKey: node,
},
},
},
}
err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef)
if err != nil {
logrus.Warnf("Failed to create PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)

}

// Delete node pod disruption budget when kubertetes is not part of cluster or portworx does not run on it
func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}

for _, node := range nodesToDeletePDB {
PDBName := "px-" + node
err = k8sutil.DeletePodDisruptionBudget(
c.k8sClient, PDBName,
cluster.Namespace, *ownerRef,
)
if err != nil {
logrus.Warnf("Failed to delete PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)
}

func (c *disruptionBudget) createKVDBPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
Expand All @@ -186,7 +290,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget(
if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal {
return nil
}

clusterSize := kvdbClusterSize(cluster)
minAvailable := intstr.FromInt(clusterSize - 1)
pdb := &policyv1.PodDisruptionBudget{
Expand Down
Loading
Loading