Skip to content

Commit

Permalink
chore(controller): refactor integration monitor
Browse files Browse the repository at this point in the history
(cherry picked from commit apache/camel-k@204af03e7)
  • Loading branch information
tadayosi authored and squakez committed May 31, 2022
1 parent d25ba13 commit d5e8d3b
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 123 deletions.
194 changes: 71 additions & 123 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"strconv"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -164,85 +163,87 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra
return integration, nil
}

func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
var controller ctrl.Object
var lastCompletedJob *batchv1.Job
var podSpec corev1.PodSpec
type controller interface {
checkReadyCondition() (bool, error)
getPodSpec() corev1.PodSpec
updateReadyCondition(readyPods []corev1.Pod) bool
}

func (action *monitorAction) newController(ctx context.Context, env *trait.Environment, integration *v1.Integration) (controller, error) {
var controller controller
var obj ctrl.Object
switch {
case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable):
controller = &appsv1.Deployment{}
obj = getUpdatedController(env, &appsv1.Deployment{})
controller = &deploymentController{
obj: obj.(*appsv1.Deployment),
integration: integration,
}
case isConditionTrue(integration, v1.IntegrationConditionKnativeServiceAvailable):
controller = &servingv1.Service{}
obj = getUpdatedController(env, &servingv1.Service{})
controller = &knativeServiceController{
obj: obj.(*servingv1.Service),
integration: integration,
}
case isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable):
controller = &batchv1beta1.CronJob{}
obj = getUpdatedController(env, &batchv1beta1.CronJob{})
controller = &cronJobController{
obj: obj.(*batchv1beta1.CronJob),
integration: integration,
client: action.client,
context: ctx,
}
default:
return fmt.Errorf("unsupported controller for integration %s", integration.Name)
return nil, fmt.Errorf("unsupported controller for integration %s", integration.Name)
}

// Retrieve the controller updated from the deployer trait execution
controller = environment.Resources.GetController(func(object ctrl.Object) bool {
return reflect.TypeOf(controller) == reflect.TypeOf(object)
if obj == nil {
return nil, fmt.Errorf("unable to retrieve controller for integration %s", integration.Name)
}

return controller, nil
}

// getUpdatedController retrieves the controller updated from the deployer trait execution.
func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object {
return env.Resources.GetController(func(object ctrl.Object) bool {
return reflect.TypeOf(obj) == reflect.TypeOf(object)
})
if controller == nil {
return fmt.Errorf("unable to retrieve controller for integration %s", integration.Name)
}

func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error {
controller, err := action.newController(ctx, environment, integration)
if err != nil {
return err
}

switch c := controller.(type) {
case *appsv1.Deployment:
// Check the Deployment progression
if progressing := kubernetes.GetDeploymentCondition(*c, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" {
integration.Status.Phase = v1.IntegrationPhaseError
setReadyConditionError(integration, progressing.Message)
return nil
}
podSpec = c.Spec.Template.Spec
if done, err := controller.checkReadyCondition(); done || err != nil {
return err
}
if done := checkPodStatuses(integration, pendingPods, runningPods); done {
return nil
}
integration.Status.Phase = v1.IntegrationPhaseRunning

case *servingv1.Service:
// Check the KnativeService conditions
if ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" {
integration.Status.Phase = v1.IntegrationPhaseError
setReadyConditionError(integration, ready.Message)
return nil
}
podSpec = c.Spec.Template.Spec.PodSpec

case *batchv1beta1.CronJob:
// Check latest job result
if lastScheduleTime := c.Status.LastScheduleTime; lastScheduleTime != nil && len(c.Status.Active) == 0 {
jobs := batchv1.JobList{}
if err := action.client.List(ctx, &jobs,
ctrl.InNamespace(integration.Namespace),
ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name},
); err != nil {
return err
}
t := lastScheduleTime.Time
for i, job := range jobs.Items {
if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) {
continue
}
lastCompletedJob = &jobs.Items[i]
t = lastCompletedJob.CreationTimestamp.Time
}
if lastCompletedJob != nil {
if failed := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue {
setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", lastCompletedJob.Name, failed.Message))
integration.Status.Phase = v1.IntegrationPhaseError
return nil
}
}
}
podSpec = c.Spec.JobTemplate.Spec.Template.Spec
readyPods, unreadyPods := filterPodsByReadyStatus(runningPods, controller.getPodSpec())
if done := controller.updateReadyCondition(readyPods); done {
return nil
}
if err := action.probeReadiness(ctx, environment, integration, unreadyPods); err != nil {
return err
}

return nil
}

func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool {
// Check Pods statuses
for _, pod := range pendingPods {
// Check the scheduled condition
if scheduled := kubernetes.GetPodCondition(pod, corev1.PodScheduled); scheduled != nil && scheduled.Status == corev1.ConditionFalse && scheduled.Reason == "Unschedulable" {
integration.Status.Phase = v1.IntegrationPhaseError
setReadyConditionError(integration, scheduled.Message)
return nil
return true
}
}
// Check pending container statuses
Expand All @@ -255,7 +256,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "ImagePullBackOff" {
integration.Status.Phase = v1.IntegrationPhaseError
setReadyConditionError(integration, waiting.Message)
return nil
return true
}
}
}
Expand All @@ -272,18 +273,20 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "CrashLoopBackOff" {
integration.Status.Phase = v1.IntegrationPhaseError
setReadyConditionError(integration, waiting.Message)
return nil
return true
}
if terminated := container.State.Terminated; terminated != nil && terminated.Reason == "Error" {
integration.Status.Phase = v1.IntegrationPhaseError
setReadyConditionError(integration, terminated.Message)
return nil
return true
}
}
}

integration.Status.Phase = v1.IntegrationPhaseRunning
return false
}

func filterPodsByReadyStatus(runningPods []corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) {
var readyPods []corev1.Pod
var unreadyPods []corev1.Pod
for _, pod := range runningPods {
Expand All @@ -308,66 +311,11 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context
}
}

switch c := controller.(type) {
case *appsv1.Deployment:
replicas := int32(1)
if r := integration.Spec.Replicas; r != nil {
replicas = *r
}
// The Deployment status reports updated and ready replicas separately,
// so that the number of ready replicas also accounts for older versions.
readyReplicas := int32(len(readyPods))
switch {
case readyReplicas >= replicas:
// The Integration is considered ready when the number of replicas
// reported to be ready is larger than or equal to the specified number
// of replicas. This avoids reporting a falsy readiness condition
// when the Integration is being down-scaled.
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
return nil

case c.Status.UpdatedReplicas < replicas:
setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas))

default:
setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas))
}

case *servingv1.Service:
ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady)
if ready.IsTrue() {
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "")
return nil
}
setReadyCondition(integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage())

case *batchv1beta1.CronJob:
switch {
case c.Status.LastScheduleTime == nil:
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
return nil

case len(c.Status.Active) > 0:
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active")
return nil

case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0:
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
return nil

case lastCompletedJob != nil:
if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue {
setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name))
return nil
}

default:
integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "")
}
}
return readyPods, unreadyPods
}

// Finally, call the readiness probes of the non-ready Pods directly,
// to retrieve insights from the Camel runtime.
// probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime.
func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, unreadyPods []corev1.Pod) error {
var runtimeNotReadyMessages []string
for i := range unreadyPods {
pod := &unreadyPods[i]
Expand Down
104 changes: 104 additions & 0 deletions pkg/controller/integration/monitor_cronjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"context"
"fmt"

batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/util/kubernetes"
)

type cronJobController struct {
obj *batchv1beta1.CronJob
integration *v1.Integration
client client.Client
context context.Context
lastCompletedJob *batchv1.Job
}

var _ controller = &cronJobController{}

func (c *cronJobController) checkReadyCondition() (bool, error) {
// Check latest job result
if lastScheduleTime := c.obj.Status.LastScheduleTime; lastScheduleTime != nil && len(c.obj.Status.Active) == 0 {
jobs := batchv1.JobList{}
if err := c.client.List(c.context, &jobs,
ctrl.InNamespace(c.integration.Namespace),
ctrl.MatchingLabels{v1.IntegrationLabel: c.integration.Name},
); err != nil {
return true, err
}
t := lastScheduleTime.Time
for i, job := range jobs.Items {
if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) {
continue
}
c.lastCompletedJob = &jobs.Items[i]
t = c.lastCompletedJob.CreationTimestamp.Time
}
if c.lastCompletedJob != nil {
if failed := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue {
setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", c.lastCompletedJob.Name, failed.Message))
c.integration.Status.Phase = v1.IntegrationPhaseError
return true, nil
}
}
}

return false, nil
}

func (c *cronJobController) getPodSpec() corev1.PodSpec {
return c.obj.Spec.JobTemplate.Spec.Template.Spec
}

func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool {
switch {
case c.obj.Status.LastScheduleTime == nil:
setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created")
return true

case len(c.obj.Status.Active) > 0:
setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active")
return true

case c.obj.Spec.SuccessfulJobsHistoryLimit != nil && *c.obj.Spec.SuccessfulJobsHistoryLimit == 0 && c.obj.Spec.FailedJobsHistoryLimit != nil && *c.obj.Spec.FailedJobsHistoryLimit == 0:
setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available")
return true

case c.lastCompletedJob != nil:
if complete := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue {
setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", c.lastCompletedJob.Name))
return true
}

default:
setReadyCondition(c.integration, corev1.ConditionUnknown, "", "")
}

return false
}
Loading

0 comments on commit d5e8d3b

Please sign in to comment.