diff --git a/cmd/e2e/upgrade_test.go b/cmd/e2e/upgrade_test.go index da3d4b8..99663bc 100644 --- a/cmd/e2e/upgrade_test.go +++ b/cmd/e2e/upgrade_test.go @@ -13,7 +13,10 @@ func TestEDSUpgradingEDS(t *testing.T) { eds := verifyEDS(t, edsName, edsSpec, edsSpec.Replicas) eds.Spec.Template.Labels["new-label"] = "hello" - err := updateEDS(edsName, eds) + var err error + eds, err = waitForEDS(t, edsName) + require.NoError(t, err) + err = updateEDS(edsName, eds) require.NoError(t, err) verifyEDS(t, edsName, eds.Spec, eds.Spec.Replicas) diff --git a/operator/elasticsearch.go b/operator/elasticsearch.go index 5714a9a..1a65531 100644 --- a/operator/elasticsearch.go +++ b/operator/elasticsearch.go @@ -491,6 +491,33 @@ func (r *EDSResource) OnStableReplicasHook(ctx context.Context) error { return r.esClient.Cleanup(ctx) } +// UpdateStatus updates the status of the EDS to set the current replicas from +// StatefulSet and updating the observedGeneration. +func (r *EDSResource) UpdateStatus(sts *appsv1.StatefulSet) error { + observedGeneration := int64(0) + if r.eds.Status.ObservedGeneration != nil { + observedGeneration = *r.eds.Status.ObservedGeneration + } + + replicas := int32(0) + if sts.Spec.Replicas != nil { + replicas = *sts.Spec.Replicas + } + + if r.eds.Generation != observedGeneration || + r.eds.Status.Replicas != replicas { + r.eds.Status.Replicas = replicas + r.eds.Status.ObservedGeneration = &r.eds.Generation + var err error + r.eds, err = r.kube.ZalandoV1().ElasticsearchDataSets(r.eds.Namespace).UpdateStatus(r.eds) + if err != nil { + return err + } + } + + return nil +} + func (r *EDSResource) applyScalingOperation() error { operation, err := edsScalingOperation(r.eds) if err != nil { @@ -781,7 +808,7 @@ func (o *ElasticsearchOperator) scaleEDS(eds *zv1.ElasticsearchDataSet, es *ESRe return err } - if scalingOperation.ScalingDirection != NONE { + if scalingOperation.ScalingDirection != NONE { eds.Annotations[esScalingOperationKey] = string(jsonBytes) // persist changes of EDS diff --git a/operator/operator.go b/operator/operator.go index e015201..8258227 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -68,6 +68,11 @@ type StatefulResource interface { // EnsureResources EnsureResources() error + // UpdateStatus updates the status of the StatefulResource. The + // statefulset is parsed to provide additional information like + // replicas to the status. + UpdateStatus(sts *appsv1.StatefulSet) error + // PreScaleDownHook is triggered when a scaledown is to be performed. // It's ensured that the hook will be triggered at least once, but it // may trigger multiple times e.g. if the scaledown fails at a later @@ -127,6 +132,11 @@ func (o *Operator) operate(ctx context.Context, sr StatefulResource) error { return fmt.Errorf("failed to reconcile StatefulSet: %v", err) } + err = sr.UpdateStatus(sts) + if err != nil { + return fmt.Errorf("failed to update status: %v", err) + } + err = o.operateNextPod(ctx, sts, sr) return err } diff --git a/operator/operator_test.go b/operator/operator_test.go index de58bce..1a51db6 100644 --- a/operator/operator_test.go +++ b/operator/operator_test.go @@ -45,6 +45,7 @@ func (r *mockResource) VolumeClaimTemplates() []v1.PersistentVolumeClaim { } func (r *mockResource) Self() runtime.Object { return r.eds } func (r *mockResource) EnsureResources() error { return nil } +func (r *mockResource) UpdateStatus(sts *appsv1.StatefulSet) error { return nil } func (r *mockResource) PreScaleDownHook(ctx context.Context) error { return nil } func (r *mockResource) OnStableReplicasHook(ctx context.Context) error { return nil } func (r *mockResource) Drain(ctx context.Context, pod *v1.Pod) error { return nil } diff --git a/pkg/apis/zalando.org/v1/types.go b/pkg/apis/zalando.org/v1/types.go index 32af039..d203cfd 100644 --- a/pkg/apis/zalando.org/v1/types.go +++ b/pkg/apis/zalando.org/v1/types.go @@ -73,6 +73,8 @@ type ElasticsearchDataSetStatus struct { // generation, which is updated on mutation by the API Server. // +optional ObservedGeneration *int64 `json:"observedGeneration,omitempty" protobuf:"varint,1,opt,name=observedGeneration"` + // Replicas is the number of Pods by the underlying StatefulSet. + Replicas int32 `json:"replicas" protobuf:"varint,2,opt,name=replicas"` LastScaleUpStarted *metav1.Time `json:"lastScaleUpStarted,omitempty"` LastScaleUpEnded *metav1.Time `json:"lastScaleUpEnded,omitempty"`