diff --git a/e2e/common/traits/health_test.go b/e2e/common/traits/health_test.go index d17e5a98dc..d3679b3a90 100644 --- a/e2e/common/traits/health_test.go +++ b/e2e/common/traits/health_test.go @@ -99,7 +99,7 @@ func TestHealthTrait(t *testing.T) { // Eventually(IntegrationCondition(ns, "java", v1.IntegrationConditionReady), TestTimeoutMedium).Should(And( WithTransform(IntegrationConditionReason, Equal(v1.IntegrationConditionRuntimeNotReadyReason)), - WithTransform(IntegrationConditionMessage, HavePrefix(fmt.Sprintf("[Pod %s runtime is not ready: map[consumer:route1:DOWN context:UP", pod.Name))), + WithTransform(IntegrationConditionMessage, HavePrefix(fmt.Sprintf("[Pod %s runtime is not ready: map[route.context.name:camel-1 route.id:route1 route.status:Stopped]", pod.Name))), )) // Check the Integration is still in running phase Eventually(IntegrationPhase(ns, "java"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning)) diff --git a/pkg/controller/integration/health.go b/pkg/controller/integration/health.go index e56b10add5..731e2eacb4 100644 --- a/pkg/controller/integration/health.go +++ b/pkg/controller/integration/health.go @@ -19,6 +19,7 @@ package integration import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -37,16 +38,26 @@ const ( ) type HealthCheck struct { - Status HealthCheckState `json:"state,omitempty"` + Status HealthCheckState `json:"status,omitempty"` Checks []HealthCheckResponse `json:"checks,omitempty"` } type HealthCheckResponse struct { Name string `json:"name,omitempty"` - Status HealthCheckState `json:"state,omitempty"` + Status HealthCheckState `json:"status,omitempty"` Data map[string]interface{} `json:"data,omitempty"` } +func NewHealthCheck(body []byte) (*HealthCheck, error) { + health := HealthCheck{} + err := json.Unmarshal(body, &health) + if err != nil { + return nil, err + } + + return &health, nil +} + func proxyGetHTTPProbe(ctx context.Context, c kubernetes.Interface, p *corev1.Probe, pod *corev1.Pod, container *corev1.Container) ([]byte, error) { if p.HTTPGet == nil { return nil, fmt.Errorf("missing probe handler for %s/%s", pod.Namespace, pod.Name) diff --git a/pkg/controller/integration/health_test.go b/pkg/controller/integration/health_test.go new file mode 100644 index 0000000000..ab7c2fc899 --- /dev/null +++ b/pkg/controller/integration/health_test.go @@ -0,0 +1,87 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewHealthCheck(t *testing.T) { + body := []byte(` + { + "status": "DOWN", + "checks": [ + { + "name": "camel-routes", + "status": "DOWN", + "data": { + "route.id": "route1", + "route.context.name": "camel-1", + "route.status": "Stopped" + } + }, + { + "name": "context", + "status": "UP", + "data": { + "context.name": "camel-1", + "context.version": "3.16.0", + "context.status": "Started" + } + }, + { + "name": "camel-consumers", + "status": "DOWN", + "data": { + "route.id": "route1", + "route.context.name": "camel-1", + "route.status": "Stopped" + } + } + ] + } + `) + health, err := NewHealthCheck(body) + assert.NoError(t, err) + assert.Equal(t, HealthCheckStateDown, health.Status) + assert.Len(t, health.Checks, 3) + assert.Equal(t, "camel-routes", health.Checks[0].Name) + assert.Equal(t, HealthCheckStateDown, health.Checks[0].Status) + assert.True(t, reflect.DeepEqual(health.Checks[0].Data, map[string]interface{}{ + "route.id": "route1", + "route.context.name": "camel-1", + "route.status": "Stopped", + })) + assert.Equal(t, "context", health.Checks[1].Name) + assert.Equal(t, HealthCheckStateUp, health.Checks[1].Status) + assert.True(t, reflect.DeepEqual(health.Checks[1].Data, map[string]interface{}{ + "context.name": "camel-1", + "context.version": "3.16.0", + "context.status": "Started", + })) + assert.Equal(t, "camel-consumers", health.Checks[2].Name) + assert.Equal(t, HealthCheckStateDown, health.Checks[2].Status) + assert.True(t, reflect.DeepEqual(health.Checks[2].Data, map[string]interface{}{ + "route.id": "route1", + "route.context.name": "camel-1", + "route.status": "Stopped", + })) +} diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index e0877e734d..91ea73f638 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -19,14 +19,12 @@ package integration import ( "context" - "encoding/json" "errors" "fmt" "reflect" "strconv" appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -164,85 +162,87 @@ func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integra return integration, nil } -func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error { - var controller ctrl.Object - var lastCompletedJob *batchv1.Job - var podSpec corev1.PodSpec +type controller interface { + checkReadyCondition() (bool, error) + getPodSpec() corev1.PodSpec + updateReadyCondition(readyPods []corev1.Pod) bool +} +func (action *monitorAction) newController(ctx context.Context, env *trait.Environment, integration *v1.Integration) (controller, error) { + var controller controller + var obj ctrl.Object switch { case isConditionTrue(integration, v1.IntegrationConditionDeploymentAvailable): - controller = &appsv1.Deployment{} + obj = getUpdatedController(env, &appsv1.Deployment{}) + controller = &deploymentController{ + obj: obj.(*appsv1.Deployment), + integration: integration, + } case isConditionTrue(integration, v1.IntegrationConditionKnativeServiceAvailable): - controller = &servingv1.Service{} + obj = getUpdatedController(env, &servingv1.Service{}) + controller = &knativeServiceController{ + obj: obj.(*servingv1.Service), + integration: integration, + } case isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable): - controller = &batchv1beta1.CronJob{} + obj = getUpdatedController(env, &batchv1beta1.CronJob{}) + controller = &cronJobController{ + obj: obj.(*batchv1beta1.CronJob), + integration: integration, + client: action.client, + context: ctx, + } default: - return fmt.Errorf("unsupported controller for integration %s", integration.Name) + return nil, fmt.Errorf("unsupported controller for integration %s", integration.Name) + } + + if obj == nil { + return nil, fmt.Errorf("unable to retrieve controller for integration %s", integration.Name) } - // Retrieve the controller updated from the deployer trait execution - controller = environment.Resources.GetController(func(object ctrl.Object) bool { - return reflect.TypeOf(controller) == reflect.TypeOf(object) + return controller, nil +} + +// getUpdatedController retrieves the controller updated from the deployer trait execution. +func getUpdatedController(env *trait.Environment, obj ctrl.Object) ctrl.Object { + return env.Resources.GetController(func(object ctrl.Object) bool { + return reflect.TypeOf(obj) == reflect.TypeOf(object) }) - if controller == nil { - return fmt.Errorf("unable to retrieve controller for integration %s", integration.Name) +} + +func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context.Context, environment *trait.Environment, integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) error { + controller, err := action.newController(ctx, environment, integration) + if err != nil { + return err } - switch c := controller.(type) { - case *appsv1.Deployment: - // Check the Deployment progression - if progressing := kubernetes.GetDeploymentCondition(*c, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" { - integration.Status.Phase = v1.IntegrationPhaseError - setReadyConditionError(integration, progressing.Message) - return nil - } - podSpec = c.Spec.Template.Spec + if done, err := controller.checkReadyCondition(); done || err != nil { + return err + } + if done := checkPodStatuses(integration, pendingPods, runningPods); done { + return nil + } + integration.Status.Phase = v1.IntegrationPhaseRunning - case *servingv1.Service: - // Check the KnativeService conditions - if ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" { - integration.Status.Phase = v1.IntegrationPhaseError - setReadyConditionError(integration, ready.Message) - return nil - } - podSpec = c.Spec.Template.Spec.PodSpec - - case *batchv1beta1.CronJob: - // Check latest job result - if lastScheduleTime := c.Status.LastScheduleTime; lastScheduleTime != nil && len(c.Status.Active) == 0 { - jobs := batchv1.JobList{} - if err := action.client.List(ctx, &jobs, - ctrl.InNamespace(integration.Namespace), - ctrl.MatchingLabels{v1.IntegrationLabel: integration.Name}, - ); err != nil { - return err - } - t := lastScheduleTime.Time - for i, job := range jobs.Items { - if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) { - continue - } - lastCompletedJob = &jobs.Items[i] - t = lastCompletedJob.CreationTimestamp.Time - } - if lastCompletedJob != nil { - if failed := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue { - setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", lastCompletedJob.Name, failed.Message)) - integration.Status.Phase = v1.IntegrationPhaseError - return nil - } - } - } - podSpec = c.Spec.JobTemplate.Spec.Template.Spec + readyPods, unreadyPods := filterPodsByReadyStatus(runningPods, controller.getPodSpec()) + if done := controller.updateReadyCondition(readyPods); done { + return nil } + if err := action.probeReadiness(ctx, environment, integration, unreadyPods); err != nil { + return err + } + + return nil +} +func checkPodStatuses(integration *v1.Integration, pendingPods []corev1.Pod, runningPods []corev1.Pod) bool { // Check Pods statuses for _, pod := range pendingPods { // Check the scheduled condition if scheduled := kubernetes.GetPodCondition(pod, corev1.PodScheduled); scheduled != nil && scheduled.Status == corev1.ConditionFalse && scheduled.Reason == "Unschedulable" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, scheduled.Message) - return nil + return true } } // Check pending container statuses @@ -255,7 +255,7 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "ImagePullBackOff" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, waiting.Message) - return nil + return true } } } @@ -272,18 +272,20 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context if waiting := container.State.Waiting; waiting != nil && waiting.Reason == "CrashLoopBackOff" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, waiting.Message) - return nil + return true } if terminated := container.State.Terminated; terminated != nil && terminated.Reason == "Error" { integration.Status.Phase = v1.IntegrationPhaseError setReadyConditionError(integration, terminated.Message) - return nil + return true } } } - integration.Status.Phase = v1.IntegrationPhaseRunning + return false +} +func filterPodsByReadyStatus(runningPods []corev1.Pod, podSpec corev1.PodSpec) ([]corev1.Pod, []corev1.Pod) { var readyPods []corev1.Pod var unreadyPods []corev1.Pod for _, pod := range runningPods { @@ -308,66 +310,11 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context } } - switch c := controller.(type) { - case *appsv1.Deployment: - replicas := int32(1) - if r := integration.Spec.Replicas; r != nil { - replicas = *r - } - // The Deployment status reports updated and ready replicas separately, - // so that the number of ready replicas also accounts for older versions. - readyReplicas := int32(len(readyPods)) - switch { - case readyReplicas >= replicas: - // The Integration is considered ready when the number of replicas - // reported to be ready is larger than or equal to the specified number - // of replicas. This avoids reporting a falsy readiness condition - // when the Integration is being down-scaled. - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) - return nil - - case c.Status.UpdatedReplicas < replicas: - setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.Status.UpdatedReplicas, replicas)) - - default: - setReadyCondition(integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) - } - - case *servingv1.Service: - ready := kubernetes.GetKnativeServiceCondition(*c, servingv1.ServiceConditionReady) - if ready.IsTrue() { - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "") - return nil - } - setReadyCondition(integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage()) - - case *batchv1beta1.CronJob: - switch { - case c.Status.LastScheduleTime == nil: - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created") - return nil - - case len(c.Status.Active) > 0: - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active") - return nil - - case c.Spec.SuccessfulJobsHistoryLimit != nil && *c.Spec.SuccessfulJobsHistoryLimit == 0 && c.Spec.FailedJobsHistoryLimit != nil && *c.Spec.FailedJobsHistoryLimit == 0: - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available") - return nil - - case lastCompletedJob != nil: - if complete := kubernetes.GetJobCondition(*lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue { - setReadyCondition(integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", lastCompletedJob.Name)) - return nil - } - - default: - integration.Status.SetCondition(v1.IntegrationConditionReady, corev1.ConditionUnknown, "", "") - } - } + return readyPods, unreadyPods +} - // Finally, call the readiness probes of the non-ready Pods directly, - // to retrieve insights from the Camel runtime. +// probeReadiness calls the readiness probes of the non-ready Pods directly to retrieve insights from the Camel runtime. +func (action *monitorAction) probeReadiness(ctx context.Context, environment *trait.Environment, integration *v1.Integration, unreadyPods []corev1.Pod) error { var runtimeNotReadyMessages []string for i := range unreadyPods { pod := &unreadyPods[i] @@ -391,15 +338,11 @@ func (action *monitorAction) updateIntegrationPhaseAndReadyCondition(ctx context runtimeNotReadyMessages = append(runtimeNotReadyMessages, fmt.Sprintf("readiness probe failed for Pod %s/%s: %s", pod.Namespace, pod.Name, err.Error())) continue } - health := HealthCheck{} - err = json.Unmarshal(body, &health) + health, err := NewHealthCheck(body) if err != nil { return err } for _, check := range health.Checks { - if check.Name != "camel-readiness-checks" { - continue - } if check.Status == HealthCheckStateUp { continue } diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go new file mode 100644 index 0000000000..8a024df169 --- /dev/null +++ b/pkg/controller/integration/monitor_cronjob.go @@ -0,0 +1,104 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "fmt" + + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + corev1 "k8s.io/api/core/v1" + + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/client" + "github.com/apache/camel-k/pkg/util/kubernetes" +) + +type cronJobController struct { + obj *batchv1beta1.CronJob + integration *v1.Integration + client client.Client + context context.Context + lastCompletedJob *batchv1.Job +} + +var _ controller = &cronJobController{} + +func (c *cronJobController) checkReadyCondition() (bool, error) { + // Check latest job result + if lastScheduleTime := c.obj.Status.LastScheduleTime; lastScheduleTime != nil && len(c.obj.Status.Active) == 0 { + jobs := batchv1.JobList{} + if err := c.client.List(c.context, &jobs, + ctrl.InNamespace(c.integration.Namespace), + ctrl.MatchingLabels{v1.IntegrationLabel: c.integration.Name}, + ); err != nil { + return true, err + } + t := lastScheduleTime.Time + for i, job := range jobs.Items { + if job.Status.Active == 0 && job.CreationTimestamp.Time.Before(t) { + continue + } + c.lastCompletedJob = &jobs.Items[i] + t = c.lastCompletedJob.CreationTimestamp.Time + } + if c.lastCompletedJob != nil { + if failed := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobFailed); failed != nil && failed.Status == corev1.ConditionTrue { + setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionLastJobFailedReason, fmt.Sprintf("last job %s failed: %s", c.lastCompletedJob.Name, failed.Message)) + c.integration.Status.Phase = v1.IntegrationPhaseError + return true, nil + } + } + } + + return false, nil +} + +func (c *cronJobController) getPodSpec() corev1.PodSpec { + return c.obj.Spec.JobTemplate.Spec.Template.Spec +} + +func (c *cronJobController) updateReadyCondition(readyPods []corev1.Pod) bool { + switch { + case c.obj.Status.LastScheduleTime == nil: + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "cronjob created") + return true + + case len(c.obj.Status.Active) > 0: + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobActiveReason, "cronjob active") + return true + + case c.obj.Spec.SuccessfulJobsHistoryLimit != nil && *c.obj.Spec.SuccessfulJobsHistoryLimit == 0 && c.obj.Spec.FailedJobsHistoryLimit != nil && *c.obj.Spec.FailedJobsHistoryLimit == 0: + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionCronJobCreatedReason, "no jobs history available") + return true + + case c.lastCompletedJob != nil: + if complete := kubernetes.GetJobCondition(*c.lastCompletedJob, batchv1.JobComplete); complete != nil && complete.Status == corev1.ConditionTrue { + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionLastJobSucceededReason, fmt.Sprintf("last job %s completed successfully", c.lastCompletedJob.Name)) + return true + } + + default: + setReadyCondition(c.integration, corev1.ConditionUnknown, "", "") + } + + return false +} diff --git a/pkg/controller/integration/monitor_deployment.go b/pkg/controller/integration/monitor_deployment.go new file mode 100644 index 0000000000..9cf748ff60 --- /dev/null +++ b/pkg/controller/integration/monitor_deployment.go @@ -0,0 +1,77 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/kubernetes" +) + +type deploymentController struct { + obj *appsv1.Deployment + integration *v1.Integration +} + +var _ controller = &deploymentController{} + +func (c *deploymentController) checkReadyCondition() (bool, error) { + // Check the Deployment progression + if progressing := kubernetes.GetDeploymentCondition(*c.obj, appsv1.DeploymentProgressing); progressing != nil && progressing.Status == corev1.ConditionFalse && progressing.Reason == "ProgressDeadlineExceeded" { + c.integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(c.integration, progressing.Message) + return true, nil + } + + return false, nil +} + +func (c *deploymentController) getPodSpec() corev1.PodSpec { + return c.obj.Spec.Template.Spec +} + +func (c *deploymentController) updateReadyCondition(readyPods []corev1.Pod) bool { + replicas := int32(1) + if r := c.integration.Spec.Replicas; r != nil { + replicas = *r + } + // The Deployment status reports updated and ready replicas separately, + // so that the number of ready replicas also accounts for older versions. + readyReplicas := int32(len(readyPods)) + switch { + case readyReplicas >= replicas: + // The Integration is considered ready when the number of replicas + // reported to be ready is larger than or equal to the specified number + // of replicas. This avoids reporting a falsy readiness condition + // when the Integration is being down-scaled. + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionDeploymentReadyReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) + return true + + case c.obj.Status.UpdatedReplicas < replicas: + setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d updated replicas", c.obj.Status.UpdatedReplicas, replicas)) + + default: + setReadyCondition(c.integration, corev1.ConditionFalse, v1.IntegrationConditionDeploymentProgressingReason, fmt.Sprintf("%d/%d ready replicas", readyReplicas, replicas)) + } + + return false +} diff --git a/pkg/controller/integration/monitor_knative.go b/pkg/controller/integration/monitor_knative.go new file mode 100644 index 0000000000..cf8d098607 --- /dev/null +++ b/pkg/controller/integration/monitor_knative.go @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + corev1 "k8s.io/api/core/v1" + + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/kubernetes" +) + +type knativeServiceController struct { + obj *servingv1.Service + integration *v1.Integration +} + +var _ controller = &knativeServiceController{} + +func (c *knativeServiceController) checkReadyCondition() (bool, error) { + // Check the KnativeService conditions + if ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady); ready.IsFalse() && ready.GetReason() == "RevisionFailed" { + c.integration.Status.Phase = v1.IntegrationPhaseError + setReadyConditionError(c.integration, ready.Message) + return true, nil + } + + return false, nil +} + +func (c *knativeServiceController) getPodSpec() corev1.PodSpec { + return c.obj.Spec.Template.Spec.PodSpec +} + +func (c *knativeServiceController) updateReadyCondition(readyPods []corev1.Pod) bool { + ready := kubernetes.GetKnativeServiceCondition(*c.obj, servingv1.ServiceConditionReady) + if ready.IsTrue() { + setReadyCondition(c.integration, corev1.ConditionTrue, v1.IntegrationConditionKnativeServiceReadyReason, "") + return true + } + setReadyCondition(c.integration, corev1.ConditionFalse, ready.GetReason(), ready.GetMessage()) + + return false +}