Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call rebalance API once all pods are updated #625

Merged
155 changes: 124 additions & 31 deletions controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@ type SolrClusterOp struct {
type SolrClusterOperationType string

const (
ScaleDownLock SolrClusterOperationType = "ScalingDown"
ScaleUpLock SolrClusterOperationType = "ScalingUp"
UpdateLock SolrClusterOperationType = "RollingUpdate"
ScaleDownLock SolrClusterOperationType = "ScalingDown"
ScaleUpLock SolrClusterOperationType = "ScalingUp"
UpdateLock SolrClusterOperationType = "RollingUpdate"
BalanceReplicasLock SolrClusterOperationType = "BalanceReplicas"
)

// RollingUpdateMetadata contains metadata for rolling update cluster operations.
type RollingUpdateMetadata struct {
// Whether or not replicas will be migrated during this rolling upgrade
RequiresReplicaMigration bool `json:"requiresReplicaMigration"`
}

func clearClusterOpLock(statefulSet *appsv1.StatefulSet) {
delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
}
Expand Down Expand Up @@ -178,10 +185,10 @@ func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudRec
} else if scaleDownOpIsQueued {
// If the statefulSet and the solrCloud have the same number of pods configured, and the queued operation is a scaleDown,
// that means the scaleDown was reverted. So there's no reason to change the number of pods.
// However, a Replica Balancing should be done just in case, so do a ScaleUp, but don't change the number of pods.
// However, a Replica Balancing should be done just in case, so start it via a new ClusterOperation.
clusterOp = &SolrClusterOp{
Operation: ScaleUpLock,
Metadata: strconv.Itoa(desiredPods),
Operation: BalanceReplicasLock,
Metadata: "UndoFailedScaleDown",
}
}
return
Expand Down Expand Up @@ -244,9 +251,32 @@ func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, in
return
}

// cleanupManagedCloudScaleDown does the logic of cleaning-up an incomplete scale down operation.
// This will remove any bad readinessConditions that the scaleDown might have set when trying to scaleDown pods.
func cleanupManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
util.SolrIsNotStoppedReadinessCondition: {
reason: PodStarted,
message: "Pod is not being deleted, traffic to the pod must be restarted",
status: true,
},
}
for _, pod := range podList {
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
err = e
return
} else {
pod = *updatedPod
}
}
return
}

// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, nextClusterOperation *SolrClusterOp, err error) {
desiredPods, err := strconv.Atoi(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
Expand All @@ -262,53 +292,69 @@ func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, inst
if err != nil {
logger.Error(err, "Error while patching StatefulSet to increase the number of pods for the ScaleUp")
}
// Return and wait for the pods to be created, which will call another reconcile
return false, false, 0, err
} else {
// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
util.SolrIsNotStoppedReadinessCondition: {
reason: PodStarted,
message: "Pod is not being deleted, traffic to the pod must be started",
status: true,
},
} else if len(podList) >= configuredPods {
nextClusterOperation = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "ScaleUp",
}
for _, pod := range podList {
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
err = e
return
} else {
pod = *updatedPod
operationComplete = true
}
return
}

// hasAnyEphemeralData returns true if any of the given pods uses ephemeral Data for Solr storage, and false if all pods use persistent storage.
func hasAnyEphemeralData(solrPods []corev1.Pod) bool {
for _, pod := range solrPods {
for _, cond := range pod.Status.Conditions {
if cond.Type == util.SolrReplicasNotEvictedReadinessCondition {
return true
}
}
if operationComplete, requestInProgress, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, "scaleUp", clusterOp.Metadata, logger); !operationComplete && err == nil {
// Retry after five seconds to check if the replica management commands have been completed
retryLaterDuration = time.Second * 5
}
}
return
return false
}

func determineRollingUpdateClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, outOfDatePods util.OutOfDatePodSegmentation) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && !outOfDatePods.IsEmpty() {
includesDataMigration := hasAnyEphemeralData(outOfDatePods.Running) || hasAnyEphemeralData(outOfDatePods.ScheduledForDeletion)
metadata := RollingUpdateMetadata{
RequiresReplicaMigration: includesDataMigration,
}
metaBytes, err := json.Marshal(metadata)
if err != nil {
return nil, 0, err
}
clusterOp = &SolrClusterOp{
Operation: UpdateLock,
Metadata: string(metaBytes),
}
}
return
}

// handleManagedCloudRollingUpdate does the logic of a managed and "locked" cloud rolling update operation.
// This will take many reconcile loops to complete, as it is deleting pods/moving replicas.
func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, nextClusterOp *SolrClusterOp, err error) {
// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
updateLogger := logger.WithName("ManagedUpdateSelector")

// First check if all pods are up to date and ready. If so the rolling update is complete
configuredPods := int(*statefulSet.Spec.Replicas)
if configuredPods == availableUpdatedPodCount {
// The configured number of pods are all healthy and up to date. The operation is complete
updateMetadata := &RollingUpdateMetadata{}
if clusterOp.Metadata != "" {
if err = json.Unmarshal([]byte(clusterOp.Metadata), &updateMetadata); err != nil {
updateLogger.Error(err, "Could not unmarshal metadata for rolling update operation")
}
}
operationComplete = true
// Only do a re-balancing for rolling restarts that migrated replicas
if updateMetadata.RequiresReplicaMigration {
nextClusterOp = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "RollingUpdateComplete",
}
}
return
} else if outOfDatePods.IsEmpty() {
// Just return and wait for the updated pods to come up healthy, these will call new reconciles, so there is nothing for us to do
Expand All @@ -327,7 +373,7 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
// a restart to get a working pod config.
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, hasReadyPod, logger)
if apiError != nil {
return false, true, 0, apiError
return false, true, 0, nil, apiError
} else if !retryLater {
// If the cluster status has been successfully fetched, then add the pods scheduled for deletion
// This requires the clusterState to be fetched successfully to ensure that we know if there
Expand Down Expand Up @@ -364,6 +410,38 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
return
}

// cleanupManagedCloudScaleDown does the logic of cleaning-up an incomplete scale down operation.
// This will remove any bad readinessConditions that the scaleDown might have set when trying to scaleDown pods.
func cleanupManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
er := EvictingReplicas
readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
util.SolrIsNotStoppedReadinessCondition: {
reason: PodStarted,
message: "Pod is not being deleted, traffic to the pod must be restarted",
status: true,
},
util.SolrReplicasNotEvictedReadinessCondition: {
// Only set this condition if the condition hasn't been changed since pod start
// We do not want to over-write future states later down the eviction pipeline
matchPreviousReason: &er,
reason: PodStarted,
message: "Pod is not being deleted, ephemeral data is no longer being evicted",
status: true,
},
}
for _, pod := range podList {
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
err = e
return
} else {
pod = *updatedPod
}
}
return
}

// clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
func clearClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
originalStatefulSet := statefulSet.DeepCopy()
Expand All @@ -376,6 +454,21 @@ func clearClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, st
return
}

// clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
func setNextClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, nextClusterOp *SolrClusterOp, reason string, logger logr.Logger) (err error) {
originalStatefulSet := statefulSet.DeepCopy()
clearClusterOpLock(statefulSet)
if err = setClusterOpLock(statefulSet, *nextClusterOp); err != nil {
logger.Error(err, "Error while patching StatefulSet to set next clusterOpLock annotation after finishing previous clusterOp", "reason", reason)
}
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to set next clusterOpLock annotation after finishing previous clusterOp", "reason", reason)
} else {
logger.Info("Set next clusterOpLock annotation on statefulSet after finishing previous clusterOp", "reason", reason)
}
return
}

// enqueueCurrentClusterOpForRetryWithPatch adds the current clusterOp to the clusterOpRetryQueue, and clears the current cluster Op.
// This method will send the StatefulSet patch to the API Server.
func enqueueCurrentClusterOpForRetryWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/solr_pod_lifecycle_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func DeletePodForUpdate(ctx context.Context, r *SolrCloudReconciler, instance *s

// Delete the pod
if deletePod {
logger.Error(err, "Deleting solr pod for update", "pod", pod.Name)
logger.Info("Deleting solr pod for update", "pod", pod.Name)
err = r.Delete(ctx, pod, client.Preconditions{
UID: &pod.UID,
})
Expand Down
28 changes: 23 additions & 5 deletions controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,17 +460,20 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
var retryLaterDuration time.Duration
if clusterOp, opErr := GetCurrentClusterOp(statefulSet); clusterOp != nil && opErr == nil {
var operationComplete, requestInProgress bool
var nextClusterOperation *SolrClusterOp
operationFound := true
shortTimeoutForRequeue := true
switch clusterOp.Operation {
case UpdateLock:
operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudRollingUpdate(ctx, r, instance, statefulSet, outOfDatePods, hasReadyPod, availableUpdatedPodCount, logger)
operationComplete, requestInProgress, retryLaterDuration, nextClusterOperation, err = handleManagedCloudRollingUpdate(ctx, r, instance, statefulSet, clusterOp, outOfDatePods, hasReadyPod, availableUpdatedPodCount, logger)
// Rolling Updates should not be requeued quickly. The operation is expected to take a long time and thus should have a longTimeout if errors are not seen.
shortTimeoutForRequeue = false
case ScaleDownLock:
operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudScaleDown(ctx, r, instance, statefulSet, clusterOp, podList, logger)
case ScaleUpLock:
operationComplete, requestInProgress, retryLaterDuration, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
operationComplete, nextClusterOperation, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
case BalanceReplicasLock:
operationComplete, requestInProgress, retryLaterDuration, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, clusterOp.Metadata, clusterOp.Metadata, logger)
default:
operationFound = false
// This shouldn't happen, but we don't want to be stuck if it does.
Expand All @@ -479,8 +482,13 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
if operationFound {
if operationComplete {
// Once the operation is complete, finish the cluster operation by deleting the statefulSet annotations
err = clearClusterOpLockWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" complete", logger)
if nextClusterOperation == nil {
// Once the operation is complete, finish the cluster operation by deleting the statefulSet annotations
err = clearClusterOpLockWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" complete", logger)
} else {
// Once the operation is complete, finish the cluster operation and start the next one by setting the statefulSet annotations
err = setNextClusterOpLockWithPatch(ctx, r, statefulSet, nextClusterOperation, string(clusterOp.Operation)+" complete", logger)
}

// TODO: Create event for the CRD.
} else if !requestInProgress {
Expand All @@ -490,6 +498,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// - the operation has a long timeout and has taken more than 10 minutes
// then continue the operation later.
// (it will likely immediately continue, since it is unlikely there is another operation to run)

clusterOpRuntime := time.Since(clusterOp.LastStartTime.Time)
queueForLaterReason := ""
if err != nil && clusterOpRuntime > time.Minute {
Expand All @@ -500,7 +509,16 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
queueForLaterReason = "timed out during operation (10 minutes)"
}
if queueForLaterReason != "" {
err = enqueueCurrentClusterOpForRetryWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" "+queueForLaterReason, logger)
// If the operation is being queued, first have the operation cleanup after itself
switch clusterOp.Operation {
case UpdateLock:
err = cleanupManagedCloudRollingUpdate(ctx, r, outOfDatePods.ScheduledForDeletion, logger)
case ScaleDownLock:
err = cleanupManagedCloudScaleDown(ctx, r, podList, logger)
}
if err == nil {
err = enqueueCurrentClusterOpForRetryWithPatch(ctx, r, statefulSet, string(clusterOp.Operation)+" "+queueForLaterReason, logger)
}

// TODO: Create event for the CRD.
}
Expand Down
6 changes: 5 additions & 1 deletion controllers/util/solr_scale_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"time"
)

// BalanceReplicasForCluster takes a SolrCloud and balances all replicas across the Pods that are currently alive.
Expand All @@ -31,7 +32,7 @@ import (
// a successful status returned from the command. So if we delete the asyncStatus, and then something happens in the operator,
// and we lose our state, then we will need to retry the balanceReplicas command. This should be ok since calling
// balanceReplicas multiple times should not be bad when the replicas for the cluster are already balanced.
func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, requestInProgress bool, err error) {
func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, balanceReason string, balanceCmdUniqueId string, logger logr.Logger) (balanceComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
logger = logger.WithValues("balanceReason", balanceReason)
// If the Cloud has 1 or zero pods, there is no reason to balance replicas.
if statefulSet.Spec.Replicas == nil || *statefulSet.Spec.Replicas < 1 {
Expand Down Expand Up @@ -96,5 +97,8 @@ func BalanceReplicasForCluster(ctx context.Context, solrCloud *solr.SolrCloud, s
}
}
}
if requestInProgress && !balanceComplete {
retryLaterDuration = time.Second * 5
}
return
}
2 changes: 2 additions & 0 deletions docs/solr-cloud/cluster-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ That is why these operations must first obtain a lock on the SolrCloud before ex
- [Managed Rolling Updates](managed-updates.md)
- [Scaling Down with Replica Migrations](scaling.md#solr-pod-scale-down)
- [Scaling Up with Replica Migrations](scaling.md#solr-pod-scale-up)
- Balancing Replicas Across Pods
- This is started after a Rolling Update with Ephemeral Data or after a ScaleUp operation.

### How is the Lock Implemented?

Expand Down
7 changes: 7 additions & 0 deletions helm/solr-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ annotations:
url: https://github.com/apache/solr-operator/issues/603
- name: Github PR
url: https://github.com/apache/solr-operator/pull/608
- kind: added
description: SolrClouds using ephemeral data will now have their replicas rebalanced after a rolling update.
links:
- name: Github Issue
url: https://github.com/apache/solr-operator/issues/615
- name: Github PR
url: https://github.com/apache/solr-operator/pull/625
artifacthub.io/images: |
- name: solr-operator
image: apache/solr-operator:v0.8.0-prerelease
Expand Down
Loading
Loading