Skip to content

Commit

Permalink
fix: clear endpointStatuses for podMonitoring without endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
hsmatulisgoogle committed Dec 16, 2024
1 parent 54c5c97 commit 35d70b2
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 57 deletions.
61 changes: 45 additions & 16 deletions pkg/operator/target_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,27 @@ func setupTargetStatusPoller(op *Operator, registry prometheus.Registerer, httpC
return nil
}

// shouldPoll verifies if polling collectors is configured or necessary.
// fetchAllPodMonitorings fetches all ClusterPodMonitoring and PodMonitoring CRs deployed in the cluster. This excludes ClusterNodeMonitoring CRs.
func fetchAllPodMonitorings(ctx context.Context, kubeClient client.Client) ([]monitoringv1.PodMonitoringCRD, error) {
var combinedList []monitoringv1.PodMonitoringCRD
var podMonitoringList monitoringv1.PodMonitoringList
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
return nil, err
}
for _, pm := range podMonitoringList.Items {
combinedList = append(combinedList, &pm)
}
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
return nil, err
}
for _, pm := range clusterPodMonitoringList.Items {
combinedList = append(combinedList, &pm)
}
return combinedList, nil
}

// shouldPoll verifies if polling collectors is configured.
func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kubeClient client.Client) (bool, error) {
// Check if target status is enabled.
var config monitoringv1.OperatorConfig
Expand All @@ -141,19 +161,6 @@ func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kub
if !config.Features.TargetStatus.Enabled {
return false, nil
}

// No need to poll if there's no PodMonitorings.
var podMonitoringList monitoringv1.PodMonitoringList
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
return false, err
} else if len(podMonitoringList.Items) == 0 {
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
return false, err
} else if len(clusterPodMonitoringList.Items) == 0 {
return false, nil
}
}
return true, nil
}

Expand Down Expand Up @@ -196,12 +203,20 @@ func (r *targetStatusReconciler) Reconcile(ctx context.Context, _ reconcile.Requ

// pollAndUpdate fetches and updates the target status in each collector pod.
func pollAndUpdate(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) error {
podMonitorings, err := fetchAllPodMonitorings(ctx, kubeClient)
if err != nil {
return err
}
if len(podMonitorings) == 0 {
// Nothing to update.
return nil
}
targets, err := fetchTargets(ctx, logger, opts, httpClient, getTarget, kubeClient)
if err != nil {
return err
}

return updateTargetStatus(ctx, logger, kubeClient, targets)
return updateTargetStatus(ctx, logger, kubeClient, targets, podMonitorings)
}

// fetchTargets retrieves the Prometheus targets using the given target function
Expand Down Expand Up @@ -307,13 +322,14 @@ func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, obj

// updateTargetStatus populates the status object of each pod using the given
// Prometheus targets.
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult) error {
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult, podMonitorings []monitoringv1.PodMonitoringCRD) error {
endpointMap, err := buildEndpointStatuses(targets)
if err != nil {
return err
}

var errs []error
withStatuses := map[string]bool{}
for job, endpointStatuses := range endpointMap {
pm, err := getObjectByScrapeJobKey(job)
if err != nil {
Expand All @@ -324,6 +340,7 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
// Skip hard-coded jobs which we do not patch.
continue
}
withStatuses[pm.GetName()] = true
pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses

if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
Expand All @@ -335,6 +352,18 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
}
}

// Any pod monitorings that exist but don't have endpoints should also be updated.
for _, pm := range podMonitorings {
if _, exists := withStatuses[pm.GetName()]; !exists {
pm.GetPodMonitoringStatus().EndpointStatuses = []monitoringv1.ScrapeEndpointStatus{}
if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
// Same reasoning as above for error handling.
errs = append(errs, err)
logger.Error(err, "patching empty status", "pm", pm.GetName(), "gvk", pm.GetObjectKind().GroupVersionKind())
}
}
}

return errors.Join(errs...)
}

Expand Down
168 changes: 127 additions & 41 deletions pkg/operator/target_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,26 @@ import (
)

type updateTargetStatusTestCase struct {
desc string
targets []*prometheusv1.TargetsResult
podMonitorings []monitoringv1.PodMonitoring
clusterPodMonitorings []monitoringv1.ClusterPodMonitoring
expErr func(err error) bool
desc string
targets []*prometheusv1.TargetsResult
podMonitorings []monitoringv1.PodMonitoring
initializeStatus []monitoringv1.PodMonitoringStatus
clusterPodMonitorings []monitoringv1.ClusterPodMonitoring
initializeClusterStatus []monitoringv1.PodMonitoringStatus
expErr func(err error) bool
}

func (tc *updateTargetStatusTestCase) getPodMonitoringCRDs() []monitoringv1.PodMonitoringCRD {
var combinedList []monitoringv1.PodMonitoringCRD

for _, pm := range tc.podMonitorings {
combinedList = append(combinedList, &pm)
}

for _, pm := range tc.clusterPodMonitorings {
combinedList = append(combinedList, &pm)
}
return combinedList
}

// Given a list of test cases on PodMonitoring, creates a new list containing
Expand Down Expand Up @@ -84,6 +99,7 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase
}
clusterTargets = append(clusterTargets, targetClusterPodMonitoring)
}

for _, pm := range tc.podMonitorings {
pmCopy := pm.DeepCopy()
cpm := monitoringv1.ClusterPodMonitoring{
Expand All @@ -103,26 +119,41 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase
}
clusterPodMonitorings = append(clusterPodMonitorings, cpm)
}

initializeClusterStatus := make([]monitoringv1.PodMonitoringStatus, 0, len(tc.initializeStatus))
for _, status := range tc.initializeStatus {
statusCopy := status.DeepCopy()

for idx, status := range statusCopy.EndpointStatuses {
statusCopy.EndpointStatuses[idx].Name = podMonitoringScrapePoolToClusterPodMonitoringScrapePool(status.Name)
}
initializeClusterStatus = append(initializeClusterStatus, *statusCopy)
}

dataPodMonitorings := updateTargetStatusTestCase{
desc: tc.desc + "-pod-monitoring",
targets: tc.targets,
podMonitorings: tc.podMonitorings,
expErr: tc.expErr,
desc: tc.desc + "-pod-monitoring",
targets: tc.targets,
podMonitorings: tc.podMonitorings,
initializeStatus: tc.initializeStatus,
expErr: tc.expErr,
}
dataFinal = append(dataFinal, dataPodMonitorings)
dataClusterPodMonitorings := updateTargetStatusTestCase{
desc: tc.desc + "-cluster-pod-monitoring",
targets: clusterTargets,
clusterPodMonitorings: clusterPodMonitorings,
expErr: tc.expErr,
desc: tc.desc + "-cluster-pod-monitoring",
targets: clusterTargets,
clusterPodMonitorings: clusterPodMonitorings,
initializeClusterStatus: initializeClusterStatus,
expErr: tc.expErr,
}
prometheusTargetsBoth := append(tc.targets, clusterTargets...)
dataBoth := updateTargetStatusTestCase{
desc: tc.desc + "-both",
targets: prometheusTargetsBoth,
podMonitorings: tc.podMonitorings,
clusterPodMonitorings: clusterPodMonitorings,
expErr: tc.expErr,
desc: tc.desc + "-both",
targets: prometheusTargetsBoth,
podMonitorings: tc.podMonitorings,
initializeStatus: tc.initializeStatus,
clusterPodMonitorings: clusterPodMonitorings,
initializeClusterStatus: initializeClusterStatus,
expErr: tc.expErr,
}
dataFinal = append(dataFinal, dataClusterPodMonitorings)
dataFinal = append(dataFinal, dataBoth)
Expand Down Expand Up @@ -1225,27 +1256,100 @@ func TestUpdateTargetStatus(t *testing.T) {
return err.Error() == "unknown scrape kind \"unknown\""
},
},
// No targets, with PodMonitoring config.
{
desc: "no-targets-no-match",
podMonitorings: []monitoringv1.PodMonitoring{
{
ObjectMeta: metav1.ObjectMeta{Name: "prom-example-1", Namespace: "gmp-test"},
Spec: monitoringv1.PodMonitoringSpec{},

Status: monitoringv1.PodMonitoringStatus{
MonitoringStatus: monitoringv1.MonitoringStatus{
ObservedGeneration: 2,
Conditions: []monitoringv1.MonitoringCondition{{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
Reason: "",
Message: "",
}},
},
},
},
},
initializeStatus: []monitoringv1.PodMonitoringStatus{
{
MonitoringStatus: monitoringv1.MonitoringStatus{
ObservedGeneration: 2,
Conditions: []monitoringv1.MonitoringCondition{{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
Reason: "",
Message: "",
}},
},
EndpointStatuses: []monitoringv1.ScrapeEndpointStatus{
{
Name: "PodMonitoring/gmp-test/prom-example-1/metrics",
ActiveTargets: 1,
UnhealthyTargets: 0,
LastUpdateTime: date,
SampleGroups: []monitoringv1.SampleGroup{
{
SampleTargets: []monitoringv1.SampleTarget{
{
Health: "up",
Labels: map[model.LabelName]model.LabelValue{
"instance": "a",
},
LastScrapeDurationSeconds: "1.2",
},
},
Count: ptr.To(int32(1)),
},
},
CollectorsFraction: "1",
},
},
},
},
},
})

for _, testCase := range testCases {
t.Run(fmt.Sprintf("target-status-conversion-%s", testCase.desc), func(t *testing.T) {
clientBuilder := newFakeClientBuilder()
for _, podMonitoring := range testCase.podMonitorings {
for i, podMonitoring := range testCase.podMonitorings {
pmCopy := podMonitoring.DeepCopy()
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
if len(testCase.initializeStatus) > 0 {
pmCopy.Status = testCase.initializeStatus[i]
} else {
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
}
clientBuilder.WithObjects(pmCopy)
}
for _, clusterPodMonitoring := range testCase.clusterPodMonitorings {
for i, clusterPodMonitoring := range testCase.clusterPodMonitorings {
pmCopy := clusterPodMonitoring.DeepCopy()
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
if len(testCase.initializeClusterStatus) > 0 {
pmCopy.Status = testCase.initializeClusterStatus[i]
} else {
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
}
clientBuilder.WithObjects(pmCopy)
}

kubeClient := clientBuilder.Build()

err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets)
// fetchTargets(ctx, logger, opts, nil, targetFetchFromMap(prometheusTargetMap), kubeClient)
err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets, testCase.getPodMonitoringCRDs())
if err != nil && (testCase.expErr == nil || !testCase.expErr(err)) {
t.Fatalf("unexpected error updating target status: %s", err)
} else if err == nil && (testCase.expErr != nil) {
t.Fatalf("expected error missing when updating target status")
}

for _, podMonitoring := range testCase.podMonitorings {
Expand Down Expand Up @@ -1602,24 +1706,6 @@ func TestShouldPoll(t *testing.T) {
should: false,
expErr: true,
},
{
desc: "should not poll targets - no podmonitorings",
objs: []client.Object{
&monitoringv1.OperatorConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "config",
Namespace: "gmp-public",
},
Features: monitoringv1.OperatorFeatures{
TargetStatus: monitoringv1.TargetStatusSpec{
Enabled: true,
},
},
},
},
should: false,
expErr: false,
},
{
desc: "should not poll targets - disabled",
objs: []client.Object{
Expand Down

0 comments on commit 35d70b2

Please sign in to comment.