Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new metrics for CassandraDatacenter.Status fields #678

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti
## unreleased

* [FEATURE] [#263]((https://github.com/k8ssandra/cass-operator/issues/263) Allow increasing the size of CassandraDataVolumeClaimSpec if the selected StorageClass supports it. This feature is currently behind a opt-in feature flag and requires an annotation ``cassandra.datastax.com/allow-storage-changes: true`` to be set in the CassandraDatacenter.
* [ENHANCEMENT] [#648](https://github.com/k8ssandra/cass-operator/issues/648) Make MinReadySeconds configurable value in the Spec.
* [FEATURE] [#646](https://github.com/k8ssandra/cass-operator/issues/646) Allow starting multiple parallel pods if they have already previously bootstrapped and not planned for replacement. Set annotation ``cassandra.datastax.com/allow-parallel-starts: true`` to enable this feature.
* [ENHANCEMENT] [#648](https://github.com/k8ssandra/cass-operator/issues/648) Make MinReadySeconds configurable value in the Spec.
* [ENHANCEMENT] [#184](https://github.com/k8ssandra/cass-operator/issues/349) Add CassandraDatacenter.Status fields as metrics also

## v1.21.1

Expand Down
67 changes: 66 additions & 1 deletion pkg/monitoring/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package monitoring

import (
"fmt"
"strings"

api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
Expand Down Expand Up @@ -53,7 +54,9 @@ func getPodStatus(pod *corev1.Pod) PodStatus {
}

var (
PodStatusVec *prometheus.GaugeVec
PodStatusVec *prometheus.GaugeVec
DatacenterStatusVec *prometheus.GaugeVec
DatacenterOperatorStatusVec *prometheus.GaugeVec
)

func init() {
Expand All @@ -64,8 +67,26 @@ func init() {
Help: "Cassandra pod statuses",
}, []string{"namespace", "cluster", "datacenter", "rack", "pod", "status"})

datacenterConditionVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cass_operator",
Subsystem: "datacenter",
Name: "status",
Help: "CassandraDatacenter conditions",
}, []string{"namespace", "cluster", "datacenter", "condition"})

datacenterOperatorStatusVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cass_operator",
Subsystem: "datacenter",
Name: "progress",
Help: "CassandraDatacenter progress state",
}, []string{"namespace", "cluster", "datacenter", "progress"})

metrics.Registry.MustRegister(podVec)
metrics.Registry.MustRegister(datacenterConditionVec)
metrics.Registry.MustRegister(datacenterOperatorStatusVec)
PodStatusVec = podVec
DatacenterStatusVec = datacenterConditionVec
DatacenterOperatorStatusVec = datacenterOperatorStatusVec
}

func UpdatePodStatusMetric(pod *corev1.Pod) {
Expand All @@ -85,3 +106,47 @@ func RemovePodStatusMetric(pod *corev1.Pod) {
func RemoveDatacenterPods(namespace, cluster, datacenter string) {
PodStatusVec.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "cluster": cluster, "datacenter": datacenter})
}

func SetDatacenterConditionMetric(dc *api.CassandraDatacenter, conditionType api.DatacenterConditionType, status corev1.ConditionStatus) {
cond := float64(0)
if status == corev1.ConditionTrue {
cond = 1
}

DatacenterStatusVec.WithLabelValues(dc.Namespace, dc.Spec.ClusterName, dc.DatacenterName(), string(conditionType)).Set(cond)
}

func UpdateOperatorDatacenterProgressStatusMetric(dc *api.CassandraDatacenter, state api.ProgressState) {
// Delete other statuses
DatacenterOperatorStatusVec.DeletePartialMatch(prometheus.Labels{"namespace": dc.Namespace, "cluster": dc.Spec.ClusterName, "datacenter": dc.DatacenterName()})

// Set this one only
DatacenterOperatorStatusVec.WithLabelValues(dc.Namespace, dc.Spec.ClusterName, dc.DatacenterName(), string(state)).Set(1)
}

// Add CassandraTask status also (how many pods done etc) per task
// Add podnames to the CassandraTask status that are done? Or waiting?

func GetMetricValue(name string, labels map[string]string) (float64, error) {
families, err := metrics.Registry.Gather()
if err != nil {
return 0, err
}

for _, fam := range families {
if *fam.Name == name {
Metric:
for _, m := range fam.Metric {
for _, label := range m.Label {
if val, ok := labels[*label.Name]; ok {
if val != *label.Value {
continue Metric
}
}
}
return *m.Gauge.Value, nil
}
}
}
return 0, fmt.Errorf("no metric found")
}
102 changes: 101 additions & 1 deletion pkg/monitoring/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestMetricAdder(t *testing.T) {
require.Error(err)
}

func TestNamespaceSeparatation(t *testing.T) {
func TestNamespaceSeparation(t *testing.T) {
require := require.New(t)
pods := make([]*corev1.Pod, 2)
for i := 0; i < len(pods); i++ {
Expand Down Expand Up @@ -150,3 +150,103 @@ func getCurrentPodStatus(podName string) (string, error) {
}
return "", fmt.Errorf("No pod status found")
}

func TestOperatorStateMetrics(t *testing.T) {
require := require.New(t)

dc := &api.CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "dc1",
Namespace: "ns",
},
Spec: api.CassandraDatacenterSpec{
ClusterName: "cluster1",
},
Status: api.CassandraDatacenterStatus{},
}

UpdateOperatorDatacenterProgressStatusMetric(dc, api.ProgressUpdating)

status, err := getCurrentDatacenterStatus("dc1")
require.NoError(err)
require.Equal("Updating", status)

UpdateOperatorDatacenterProgressStatusMetric(dc, api.ProgressReady)

status, err = getCurrentDatacenterStatus("dc1")
require.NoError(err)
require.Equal("Ready", status)
}

func getCurrentDatacenterStatus(dcName string) (string, error) {
families, err := metrics.Registry.Gather()
if err != nil {
return "", err
}

for _, fam := range families {
if *fam.Name == "cass_operator_datacenter_progress" {
Metric:
for _, m := range fam.Metric {
status := ""
for _, label := range m.Label {
if *label.Name == "datacenter" {
if *label.Value != dcName {
continue Metric
}
}
if *label.Name == "progress" {
status = *label.Value
}
}
if *m.Gauge.Value > 0 {
return status, nil
}
}
}
}
return "", fmt.Errorf("No datacenter status found")
}

func TestDatacenterConditionMetrics(t *testing.T) {
require := require.New(t)

dc := &api.CassandraDatacenter{
ObjectMeta: metav1.ObjectMeta{
Name: "dc1",
Namespace: "ns",
},
Spec: api.CassandraDatacenterSpec{
ClusterName: "cluster1",
},
Status: api.CassandraDatacenterStatus{
Conditions: []api.DatacenterCondition{
{
Type: api.DatacenterReady,
Status: corev1.ConditionTrue,
},
},
},
}

SetDatacenterConditionMetric(dc, api.DatacenterReady, corev1.ConditionTrue)

status, err := getCurrentDatacenterCondition("dc1", api.DatacenterReady)
require.NoError(err)
require.Equal(float64(1), status)

SetDatacenterConditionMetric(dc, api.DatacenterInitialized, corev1.ConditionTrue)
SetDatacenterConditionMetric(dc, api.DatacenterReady, corev1.ConditionFalse)

status, err = getCurrentDatacenterCondition("dc1", api.DatacenterReady)
require.NoError(err)
require.Equal(float64(0), status)

status, err = getCurrentDatacenterCondition("dc1", api.DatacenterInitialized)
require.NoError(err)
require.Equal(float64(1), status)
}

func getCurrentDatacenterCondition(dcName string, conditionType api.DatacenterConditionType) (float64, error) {
return GetMetricValue("cass_operator_datacenter_status", map[string]string{"datacenter": dcName, "condition": string(conditionType)})
}
18 changes: 13 additions & 5 deletions pkg/reconciliation/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"

api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
"github.com/k8ssandra/cass-operator/pkg/monitoring"
"github.com/k8ssandra/cass-operator/pkg/oplabels"
"github.com/k8ssandra/cass-operator/pkg/utils"

Expand Down Expand Up @@ -70,6 +71,8 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS
return err
}

monitoring.UpdateOperatorDatacenterProgressStatusMetric(rc.Datacenter, newState)

// The allow-upgrade=once annotation is temporary and should be removed after first successful reconcile
if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.UpdateAllowedAnnotation) && rc.Datacenter.Annotations[api.UpdateAllowedAnnotation] == string(api.AllowUpdateOnce) {
// remove the annotation
Expand All @@ -85,11 +88,16 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS
}

func setDatacenterStatus(rc *ReconciliationContext) error {
patch := client.MergeFrom(rc.Datacenter.DeepCopy())
rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation
rc.setCondition(api.NewDatacenterCondition(api.DatacenterRequiresUpdate, corev1.ConditionFalse))
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil {
rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state")
if rc.Datacenter.Status.ObservedGeneration != rc.Datacenter.Generation {
patch := client.MergeFrom(rc.Datacenter.DeepCopy())
rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil {
rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state")
return err
}
}

if err := rc.setConditionStatus(api.DatacenterRequiresUpdate, corev1.ConditionFalse); err != nil {
return err
}

Expand Down
47 changes: 8 additions & 39 deletions pkg/reconciliation/decommission_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/k8ssandra/cass-operator/pkg/events"
"github.com/k8ssandra/cass-operator/pkg/httphelper"
"github.com/k8ssandra/cass-operator/pkg/monitoring"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -82,19 +83,8 @@ func (rc *ReconciliationContext) DecommissionNodes(epData httphelper.CassMetadat
if maxReplicas > desiredNodeCount {
logger.V(1).Info("reconcile_racks::DecommissionNodes::scaleDownRack", "Rack", rackInfo.RackName, "maxReplicas", maxReplicas, "desiredNodeCount", desiredNodeCount)

dcPatch := client.MergeFrom(dc.DeepCopy())
updated := false

updated = rc.setCondition(
api.NewDatacenterCondition(
api.DatacenterScalingDown, corev1.ConditionTrue)) || updated

if updated {
err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch)
if err != nil {
logger.Error(err, "error patching datacenter status for scaling down rack started")
return result.Error(err)
}
if err := rc.setConditionStatus(api.DatacenterScalingDown, corev1.ConditionTrue); err != nil {
return result.Error(err)
}

rc.ReqLogger.Info(
Expand Down Expand Up @@ -219,21 +209,8 @@ func (rc *ReconciliationContext) CheckDecommissioningNodes(epData httphelper.Cas
}
}

dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
updated := false

updated = rc.setCondition(
api.NewDatacenterCondition(
api.DatacenterScalingDown, corev1.ConditionFalse)) || updated

if updated {
err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch)
if err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for scaling down finished")
return result.Error(err)
}
// Requeue after updating to ensure we verify previous steps with the new size
return result.RequeueSoon(0)
if err := rc.setConditionStatus(api.DatacenterScalingDown, corev1.ConditionFalse); err != nil {
return result.Error(err)
}

return result.Continue()
Expand Down Expand Up @@ -424,20 +401,12 @@ func (rc *ReconciliationContext) EnsurePodsCanAbsorbDecommData(decommPod *corev1
rc.ReqLogger.Error(fmt.Errorf(msg), msg)
rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeWarning, events.InvalidDatacenterSpec, msg)

dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
updated := rc.setCondition(
if err := rc.setCondition(
api.NewDatacenterConditionWithReason(api.DatacenterValid,
corev1.ConditionFalse, "notEnoughSpaceToScaleDown", msg,
),
)

if updated {
patchErr := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch)
if patchErr != nil {
msg := "error patching condition Valid for failed scale down."
rc.ReqLogger.Error(patchErr, msg)
return patchErr
}
); err != nil {
return errors.Wrap(err, msg)
}

return fmt.Errorf(msg)
Expand Down
9 changes: 2 additions & 7 deletions pkg/reconciliation/reconcile_datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,8 @@ func (rc *ReconciliationContext) ProcessDeletion() result.ReconcileResult {
}

if len(dcs) > 1 {
dcPatch := client.MergeFrom(rc.Datacenter.DeepCopy())
if updated := rc.setCondition(api.NewDatacenterCondition(api.DatacenterDecommission, corev1.ConditionTrue)); updated {
err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, dcPatch)
if err != nil {
rc.ReqLogger.Error(err, "error patching datacenter status for decommissiong started")
return result.Error(err)
}
if err := rc.setConditionStatus(api.DatacenterDecommission, corev1.ConditionTrue); err != nil {
return result.Error(err)
}

rc.ReqLogger.V(1).Info("Decommissioning the datacenter to 0 nodes first before deletion")
Expand Down
Loading
Loading