Skip to content

Commit

Permalink
chore: require Pods to have integration label
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Nov 30, 2023
1 parent e27f612 commit c62f02b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 72 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
selector := labels.NewSelector().Add(*hasIntegrationLabel)

selectors := map[ctrl.Object]cache.ByObject{
&corev1.Pod{}: {Label: selector},
&appsv1.Deployment{}: {Label: selector},
&batchv1.Job{}: {Label: selector},
&servingv1.Service{}: {Label: selector},
Expand Down
28 changes: 0 additions & 28 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -458,33 +457,6 @@ func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Buil
}),
builder.WithPredicates(NonManagedObjectPredicate{}),
).
// We must watch also Revisions, since it's the object that really change when a Knative service scales up and down
Watches(&servingv1.Revision{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
revision, ok := a.(*servingv1.Revision)
if !ok {
log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService Revision")
return []reconcile.Request{}
}
ksvc := &servingv1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: servingv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: revision.Labels["serving.knative.dev/service"],
Namespace: revision.Namespace,
},
}
err := c.Get(ctx, ctrl.ObjectKeyFromObject(ksvc), ksvc)
if err != nil {
// The revision does not belong to any managed (owned or imported) KnativeService, just discard
return []reconcile.Request{}
}
return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc})
}),
builder.WithPredicates(NonManagedObjectPredicate{}),
).
// Watch for the owned CronJobs
Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
}
Expand Down
36 changes: 19 additions & 17 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"

Expand Down Expand Up @@ -136,26 +135,30 @@ func (action *monitorAction) monitorPods(ctx context.Context, environment *trait
if err != nil {
return nil, err
}
if controller.isEmptySelector() {

// In order to simplify the monitoring and have a minor resource requirement, we will watch only those Pods
// which are labeled with `camel.apache.org/integration`. This is a design choice that requires the user to
// voluntarily add a label to their Pods (via template, possibly) in order to monitor the non managed Camel applications.

if !controller.hasTemplateIntegrationLabel() {
// This is happening when the Deployment, CronJob, etc resources
// have no selector or labels to identify sibling Pods.
// miss the Integration label, required to identify sibling Pods.
integration.Status.Phase = v1.IntegrationPhaseCannotMonitor
integration.Status.SetConditions(
v1.IntegrationCondition{
Type: v1.IntegrationConditionMonitoringPodsAvailable,
Status: corev1.ConditionFalse,
Reason: v1.IntegrationConditionMonitoringPodsAvailableReason,
Message: fmt.Sprintf("Could not find any selector for %s. Make sure to include any label in the template and the Pods generated to inherit such label for monitoring purposes.", controller.getControllerName()),
Type: v1.IntegrationConditionMonitoringPodsAvailable,
Status: corev1.ConditionFalse,
Reason: v1.IntegrationConditionMonitoringPodsAvailableReason,
Message: fmt.Sprintf(
"Could not find `camel.apache.org/integration: %s` label in the %s template. Make sure to include this label in the template for Pod monitoring purposes.",
integration.GetName(),
controller.getControllerName(),
),
},
)
return integration, nil
}

controllerSelector := controller.getSelector()
selector, err := metav1.LabelSelectorAsSelector(&controllerSelector)
if err != nil {
return nil, err
}
integration.Status.SetConditions(
v1.IntegrationCondition{
Type: v1.IntegrationConditionMonitoringPodsAvailable,
Expand All @@ -166,21 +169,21 @@ func (action *monitorAction) monitorPods(ctx context.Context, environment *trait
// Enforce the scale sub-resource label selector.
// It is used by the HPA that queries the scale sub-resource endpoint,
// to list the pods owned by the integration.
integration.Status.Selector = selector.String()
integration.Status.Selector = v1.IntegrationLabel + "=" + integration.Name

// Update the replicas count
pendingPods := &corev1.PodList{}
err = action.client.List(ctx, pendingPods,
ctrl.InNamespace(integration.Namespace),
&ctrl.ListOptions{LabelSelector: selector},
ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
ctrl.MatchingFields{"status.phase": string(corev1.PodPending)})
if err != nil {
return nil, err
}
runningPods := &corev1.PodList{}
err = action.client.List(ctx, runningPods,
ctrl.InNamespace(integration.Namespace),
&ctrl.ListOptions{LabelSelector: selector},
ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
ctrl.MatchingFields{"status.phase": string(corev1.PodRunning)})
if err != nil {
return nil, err
Expand Down Expand Up @@ -294,8 +297,7 @@ type controller interface {
checkReadyCondition(ctx context.Context) (bool, error)
getPodSpec() corev1.PodSpec
updateReadyCondition(readyPods []corev1.Pod) bool
getSelector() metav1.LabelSelector
isEmptySelector() bool
hasTemplateIntegrationLabel() bool
getControllerName() string
}

Expand Down
12 changes: 2 additions & 10 deletions pkg/controller/integration/monitor_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -112,15 +111,8 @@ func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool {
return false
}

func (c *cronJobController) getSelector() metav1.LabelSelector {
// We use all the labels which will be transferred to the Pod generated
return metav1.LabelSelector{
MatchLabels: c.obj.Spec.JobTemplate.Spec.Template.Labels,
}
}

func (c *cronJobController) isEmptySelector() bool {
return c.obj.Spec.JobTemplate.Spec.Template.Labels == nil
func (c *cronJobController) hasTemplateIntegrationLabel() bool {
return c.obj.Spec.JobTemplate.Spec.Template.Labels[v1.IntegrationLabel] != ""
}

func (c *cronJobController) getControllerName() string {
Expand Down
9 changes: 2 additions & 7 deletions pkg/controller/integration/monitor_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
Expand Down Expand Up @@ -86,12 +85,8 @@ func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) bool
return false
}

func (c *deploymentController) getSelector() metav1.LabelSelector {
return *c.obj.Spec.Selector
}

func (c *deploymentController) isEmptySelector() bool {
return c.obj.Spec.Selector.MatchExpressions == nil && c.obj.Spec.Selector.MatchLabels == nil
func (c *deploymentController) hasTemplateIntegrationLabel() bool {
return c.obj.Spec.Template.Labels[v1.IntegrationLabel] != ""
}

func (c *deploymentController) getControllerName() string {
Expand Down
12 changes: 2 additions & 10 deletions pkg/controller/integration/monitor_knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

servingv1 "knative.dev/serving/pkg/apis/serving/v1"

Expand Down Expand Up @@ -66,15 +65,8 @@ func (c *knativeServiceController) updateReadyCondition(readyPods []corev1.Pod)
return false
}

func (c *knativeServiceController) getSelector() metav1.LabelSelector {
// We use all the labels which will be transferred to the Pod generated
return metav1.LabelSelector{
MatchLabels: c.obj.Spec.Template.Labels,
}
}

func (c *knativeServiceController) isEmptySelector() bool {
return c.obj.Spec.Template.Labels == nil
func (c *knativeServiceController) hasTemplateIntegrationLabel() bool {
return c.obj.Spec.Template.Labels[v1.IntegrationLabel] != ""
}

func (c *knativeServiceController) getControllerName() string {
Expand Down

0 comments on commit c62f02b

Please sign in to comment.