From c6e267512211d8c26af9e17c95e8c3ea07d1620d Mon Sep 17 00:00:00 2001 From: Pasquale Congiusti Date: Fri, 1 Dec 2023 15:48:32 +0100 Subject: [PATCH] chore: synthetic Integration separate controller --- pkg/cmd/operator/operator.go | 3 + .../integration/integration_controller.go | 44 +-- .../integration_controller_import.go | 249 --------------- pkg/controller/integration/monitor.go | 1 - .../integration/monitor_synthetic.go | 18 -- pkg/controller/integration/predicate.go | 37 --- pkg/controller/pipe/pipe_controller.go | 2 +- pkg/controller/synthetic/synthetic.go | 302 ++++++++++++++++++ 8 files changed, 310 insertions(+), 346 deletions(-) delete mode 100644 pkg/controller/integration/integration_controller_import.go create mode 100644 pkg/controller/synthetic/synthetic.go diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 04b5ea8b23..12edd7cc11 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -59,6 +59,7 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/controller" + "github.com/apache/camel-k/v2/pkg/controller/synthetic" "github.com/apache/camel-k/v2/pkg/event" "github.com/apache/camel-k/v2/pkg/install" "github.com/apache/camel-k/v2/pkg/platform" @@ -231,6 +232,8 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log) exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform") + log.Info("Starting the synthetic Integration manager") + exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache(), mgr.GetAPIReader()), "synthetic Integration manager error") log.Info("Starting the manager") exitOnError(mgr.Start(ctx), "manager exited non-zero") } diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 1979b9d4ac..742d956fc1 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -405,37 +405,13 @@ func watchIntegrationResources(c client.Client, b *builder.Builder) { }, } })). - // Watch for non managed Deployments (ie, imported) - Watches(&appsv1.Deployment{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { - deploy, ok := a.(*appsv1.Deployment) - if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Deployment") - return []reconcile.Request{} - } - return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelDeployment{deploy: deploy}) - }), - builder.WithPredicates(NonManagedObjectPredicate{}), - ). // Watch for the owned Deployments Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})) } func watchCronJobResources(c client.Client, b *builder.Builder) { - // Watch for non managed Deployments (ie, imported) - b.Watches(&batchv1.CronJob{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { - cron, ok := a.(*batchv1.CronJob) - if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve CronJob") - return []reconcile.Request{} - } - return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelCronjob{cron: cron}) - }), - builder.WithPredicates(NonManagedObjectPredicate{}), - ). - // Watch for the owned CronJobs - Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) + // Watch for the owned CronJobs + b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) } func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error { @@ -445,20 +421,8 @@ func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Buil if ok, err := kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil { return err } else if ok { - // Watch for non managed Knative Service (ie, imported) - b.Watches(&servingv1.Service{}, - handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request { - ksvc, ok := a.(*servingv1.Service) - if !ok { - log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService") - return []reconcile.Request{} - } - return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc}) - }), - builder.WithPredicates(NonManagedObjectPredicate{}), - ). - // Watch for the owned CronJobs - Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{})) + // Watch for the owned Knative Services + b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{})) } return nil } diff --git a/pkg/controller/integration/integration_controller_import.go b/pkg/controller/integration/integration_controller_import.go deleted file mode 100644 index 4031855097..0000000000 --- a/pkg/controller/integration/integration_controller_import.go +++ /dev/null @@ -1,249 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package integration - -import ( - "context" - - v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" - "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" - "github.com/apache/camel-k/v2/pkg/client" - "github.com/apache/camel-k/v2/pkg/util/log" - "github.com/apache/camel-k/v2/pkg/util/patch" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - servingv1 "knative.dev/serving/pkg/apis/serving/v1" - ctrl "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -// nonManagedCamelAppEnqueueRequestsFromMapFunc represent the function to discover the Integration which has to be woke up: it creates a synthetic -// Integration if the Integration does not exist. This is used to import external Camel applications. -func nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx context.Context, c client.Client, adp NonManagedCamelApplicationAdapter) []reconcile.Request { - if adp.GetIntegrationName() == "" { - return []reconcile.Request{} - } - it := v1.NewIntegration(adp.GetIntegrationNameSpace(), adp.GetIntegrationName()) - err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it) - if err != nil { - if k8serrors.IsNotFound(err) { - // We must perform this check to make sure the resource is not being deleted. - // In such case it makes no sense to create an Integration after it. - err := c.Get(ctx, ctrl.ObjectKeyFromObject(adp.GetAppObj()), adp.GetAppObj()) - if err != nil { - if k8serrors.IsNotFound(err) { - return []reconcile.Request{} - } - log.Errorf(err, "Some error happened while trying to get %s %s resource", adp.GetName(), adp.GetKind()) - } - createSyntheticIntegration(&it, adp) - target, err := patch.ApplyPatch(&it) - if err == nil { - err = c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) - if err != nil { - log.Errorf(err, "Some error happened while creating a synthetic Integration after %s %s resource", adp.GetName(), adp.GetKind()) - return []reconcile.Request{} - } - log.Infof( - "Created a synthetic Integration %s after %s %s", - it.GetName(), - adp.GetName(), - adp.GetKind(), - ) - return []reconcile.Request{ - { - NamespacedName: types.NamespacedName{ - Namespace: it.Namespace, - Name: it.Name, - }, - }, - } - } - if err != nil { - log.Infof("Could not create Integration %s: %s", adp.GetIntegrationName(), err.Error()) - return []reconcile.Request{} - } - } - log.Errorf(err, "Could not get Integration %s", it.GetName()) - return []reconcile.Request{} - } - - return []reconcile.Request{ - { - NamespacedName: types.NamespacedName{ - Namespace: it.Namespace, - Name: it.Name, - }, - }, - } -} - -// createSyntheticIntegration set all required values for a synthetic Integration. -func createSyntheticIntegration(it *v1.Integration, adp NonManagedCamelApplicationAdapter) { - // We need to create a synthetic Integration - it.SetAnnotations(map[string]string{ - v1.IntegrationImportedNameLabel: adp.GetName(), - v1.IntegrationImportedKindLabel: adp.GetKind(), - v1.IntegrationSyntheticLabel: "true", - }) - it.Spec = v1.IntegrationSpec{ - Traits: adp.GetTraits(), - } -} - -// NonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle. -type NonManagedCamelApplicationAdapter interface { - // GetName returns the name of the Camel application. - GetName() string - // GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). - GetKind() string - // GetTraits in used to retrieve the trait configuration. - GetTraits() v1.Traits - // GetIntegrationName return the name of the Integration which has to be imported. - GetIntegrationName() string - // GetIntegrationNameSpace return the namespace of the Integration which has to be imported. - GetIntegrationNameSpace() string - // GetAppObj return the object from which we're importing. - GetAppObj() ctrl.Object -} - -// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle. -type NonManagedCamelDeployment struct { - deploy *appsv1.Deployment -} - -// GetName returns the name of the Camel application. -func (app *NonManagedCamelDeployment) GetName() string { - return app.deploy.GetName() -} - -// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). -func (app *NonManagedCamelDeployment) GetKind() string { - return "Deployment" -} - -// GetTraits in used to retrieve the trait configuration. -func (app *NonManagedCamelDeployment) GetTraits() v1.Traits { - return v1.Traits{ - Container: &trait.ContainerTrait{ - Name: app.getContainerNameFromDeployment(), - }, - } -} - -// GetAppObj return the object from which we're importing. -func (app *NonManagedCamelDeployment) GetAppObj() ctrl.Object { - return app.deploy -} - -// GetIntegrationName return the name of the Integration which has to be imported. -func (app *NonManagedCamelDeployment) GetIntegrationName() string { - return app.deploy.Labels[v1.IntegrationLabel] -} - -// GetIntegrationNameSpace return the namespace of the Integration which has to be imported. -func (app *NonManagedCamelDeployment) GetIntegrationNameSpace() string { - return app.deploy.Namespace -} - -// getContainerNameFromDeployment returns the container name which is running the Camel application. -func (app *NonManagedCamelDeployment) getContainerNameFromDeployment() string { - firstContainerName := "" - for _, ct := range app.deploy.Spec.Template.Spec.Containers { - // set as fallback if no container is named as the deployment - if firstContainerName == "" { - firstContainerName = app.deploy.Name - } - if ct.Name == app.deploy.Name { - return app.deploy.Name - } - } - return firstContainerName -} - -// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle. -type NonManagedCamelCronjob struct { - cron *batchv1.CronJob -} - -// GetName returns the name of the Camel application. -func (app *NonManagedCamelCronjob) GetName() string { - return app.cron.GetName() -} - -// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). -func (app *NonManagedCamelCronjob) GetKind() string { - return "CronJob" -} - -// GetTraits in used to retrieve the trait configuration. -func (app *NonManagedCamelCronjob) GetTraits() v1.Traits { - return v1.Traits{} -} - -// GetIntegrationName return the name of the Integration which has to be imported. -func (app *NonManagedCamelCronjob) GetIntegrationName() string { - return app.cron.Labels[v1.IntegrationLabel] -} - -// GetIntegrationNameSpace return the namespace of the Integration which has to be imported. -func (app *NonManagedCamelCronjob) GetIntegrationNameSpace() string { - return app.cron.Namespace -} - -// GetAppObj return the object from which we're importing. -func (app *NonManagedCamelCronjob) GetAppObj() ctrl.Object { - return app.cron -} - -// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle. -type NonManagedCamelKnativeService struct { - ksvc *servingv1.Service -} - -// GetName returns the name of the Camel application. -func (app *NonManagedCamelKnativeService) GetName() string { - return app.ksvc.GetName() -} - -// GetKind returns the kind of the Camel application (ie, Deployment, Cronjob, ...). -func (app *NonManagedCamelKnativeService) GetKind() string { - return "KnativeService" -} - -// GetTraits in used to retrieve the trait configuration. -func (app *NonManagedCamelKnativeService) GetTraits() v1.Traits { - return v1.Traits{} -} - -// GetIntegrationName return the name of the Integration which has to be imported. -func (app *NonManagedCamelKnativeService) GetIntegrationName() string { - return app.ksvc.Labels[v1.IntegrationLabel] -} - -// GetIntegrationNameSpace return the namespace of the Integration which has to be imported. -func (app *NonManagedCamelKnativeService) GetIntegrationNameSpace() string { - return app.ksvc.Namespace -} - -// GetAppObj return the object from which we're importing. -func (app *NonManagedCamelKnativeService) GetAppObj() ctrl.Object { - return app.ksvc -} diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index e3c3d421fe..b11d19f14f 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -60,7 +60,6 @@ func (action *monitorAction) CanHandle(integration *v1.Integration) bool { return integration.Status.Phase == v1.IntegrationPhaseDeploying || integration.Status.Phase == v1.IntegrationPhaseRunning || integration.Status.Phase == v1.IntegrationPhaseError || - integration.Status.Phase == v1.IntegrationPhaseImportMissing || integration.Status.Phase == v1.IntegrationPhaseCannotMonitor } diff --git a/pkg/controller/integration/monitor_synthetic.go b/pkg/controller/integration/monitor_synthetic.go index a10a03debe..a1aa86a43d 100644 --- a/pkg/controller/integration/monitor_synthetic.go +++ b/pkg/controller/integration/monitor_synthetic.go @@ -19,10 +19,8 @@ package integration import ( "context" - "fmt" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/trait" @@ -44,22 +42,6 @@ func (action *monitorSyntheticAction) Name() string { func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) { environment, err := trait.NewSyntheticEnvironment(ctx, action.client, integration, nil) if err != nil { - if k8serrors.IsNotFound(err) { - // Not an error: the resource from which we imported has been deleted, report in it status. - // It may be a temporary situation, for example, if the deployment from which the Integration is imported - // is being redeployed. For this reason we should keep the Integration instead of forcefully removing it. - message := fmt.Sprintf( - "import %s %s no longer available", - integration.Annotations[v1.IntegrationImportedKindLabel], - integration.Annotations[v1.IntegrationImportedNameLabel], - ) - action.L.Info(message) - integration.SetReadyConditionError(message) - zero := int32(0) - integration.Status.Phase = v1.IntegrationPhaseImportMissing - integration.Status.Replicas = &zero - return integration, nil - } // report the error integration.Status.Phase = v1.IntegrationPhaseError integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionImportingKindAvailableReason, err.Error()) diff --git a/pkg/controller/integration/predicate.go b/pkg/controller/integration/predicate.go index 0feb71fec3..79d61556a9 100644 --- a/pkg/controller/integration/predicate.go +++ b/pkg/controller/integration/predicate.go @@ -21,7 +21,6 @@ import ( "reflect" "k8s.io/apimachinery/pkg/api/equality" - ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" ) @@ -56,39 +55,3 @@ func (StatusChangedPredicate) Update(e event.UpdateEvent) bool { return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface()) } - -// NonManagedObjectPredicate implements a generic update predicate function for managed object. -type NonManagedObjectPredicate struct { - predicate.Funcs -} - -// Create --. -func (NonManagedObjectPredicate) Create(e event.CreateEvent) bool { - return !isManagedObject(e.Object) -} - -// Update --. -func (NonManagedObjectPredicate) Update(e event.UpdateEvent) bool { - return !isManagedObject(e.ObjectNew) -} - -// Delete --. -func (NonManagedObjectPredicate) Delete(e event.DeleteEvent) bool { - return !isManagedObject(e.Object) -} - -// Generic --. -func (NonManagedObjectPredicate) Generic(e event.GenericEvent) bool { - return !isManagedObject(e.Object) -} - -// isManagedObject returns true if the object is managed by an Integration. -func isManagedObject(obj ctrl.Object) bool { - for _, mr := range obj.GetOwnerReferences() { - if mr.APIVersion == "camel.apache.org/v1" && - mr.Kind == "Integration" { - return true - } - } - return false -} diff --git a/pkg/controller/pipe/pipe_controller.go b/pkg/controller/pipe/pipe_controller.go index 36da7fca1a..5b174e435e 100644 --- a/pkg/controller/pipe/pipe_controller.go +++ b/pkg/controller/pipe/pipe_controller.go @@ -66,7 +66,7 @@ func newReconciler(mgr manager.Manager, c client.Client) reconcile.Reconciler { } func add(mgr manager.Manager, r reconcile.Reconciler) error { - c, err := controller.New("kamelet-binding-controller", mgr, controller.Options{Reconciler: r}) + c, err := controller.New("pipe-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } diff --git a/pkg/controller/synthetic/synthetic.go b/pkg/controller/synthetic/synthetic.go new file mode 100644 index 0000000000..4777dc24f2 --- /dev/null +++ b/pkg/controller/synthetic/synthetic.go @@ -0,0 +1,302 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package synthetic + +import ( + "context" + "fmt" + "reflect" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + "github.com/apache/camel-k/v2/pkg/client" + "github.com/apache/camel-k/v2/pkg/platform" + "github.com/apache/camel-k/v2/pkg/util/kubernetes" + "github.com/apache/camel-k/v2/pkg/util/log" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + clientgocache "k8s.io/client-go/tools/cache" + "knative.dev/serving/pkg/apis/serving" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" + "sigs.k8s.io/controller-runtime/pkg/cache" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ManageSyntheticIntegrations is the controller for synthetic Integrations. Consider that the lifecycle of the objects are driven +// by the way we are monitoring them. Since we're filtering by `camel.apache.org/integration` label in the cached clinet, +// you must consider an add, update or delete +// accordingly, ie, when the user label the resource, then it is considered as an add, when it removes the label, it is considered as a delete. +// We must filter only non managed objects in order to avoid to conflict with the reconciliation loop of managed objects (owned by an Integration). +func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cache.Cache, reader ctrl.Reader) error { + informers, err := getInformers(ctx, c, cache) + if err != nil { + return err + } + for _, informer := range informers { + informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ctrlObj, ok := obj.(ctrl.Object) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", obj), "Failed to retrieve Object on add event") + return + } + if !isManagedObject(ctrlObj) { + integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel] + it, err := getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName) + if err != nil { + if k8serrors.IsNotFound(err) { + adapter, err := nonManagedCamelApplicationFactory(ctrlObj) + if err != nil { + log.Errorf(err, "Some error happened while creating a Camel application adapter for %s", integrationName) + } + if err = createSyntheticIntegration(ctx, c, adapter.Integration()); err != nil { + log.Errorf(err, "Some error happened while creating a synthetic Integration %s", integrationName) + } + log.Infof("Created a synthetic Integration %s after %s resource object", it.GetName(), ctrlObj.GetName()) + } else { + log.Errorf(err, "Some error happened while loading a synthetic Integration %s", integrationName) + } + } else { + if it.Status.Phase == v1.IntegrationPhaseImportMissing { + // Update with proper phase (reconcilation will take care) + it.Status.Phase = v1.IntegrationPhaseNone + if err = updateSyntheticIntegration(ctx, c, it); err != nil { + log.Errorf(err, "Some error happened while updatinf a synthetic Integration %s", integrationName) + } + } else { + log.Infof("Synthetic Integration %s is in phase %s. Skipping.", integrationName, it.Status.Phase) + } + } + } + }, + DeleteFunc: func(obj interface{}) { + ctrlObj, ok := obj.(ctrl.Object) + if !ok { + log.Error(fmt.Errorf("type assertion failed: %v", obj), "Failed to retrieve Object on delete event") + return + } + if !isManagedObject(ctrlObj) { + integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel] + // We must use a non caching client to understand if the object has been deleted from the cluster or only deleted from + // the cache (ie, user removed the importing label) + err := reader.Get(ctx, ctrl.ObjectKeyFromObject(ctrlObj), ctrlObj) + if err != nil { + if k8serrors.IsNotFound(err) { + // Object removed from the cluster + it, err := getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName) + if err != nil { + log.Errorf(err, "Some error happened while loading a synthetic Integration %s", it.Name) + return + } + // The resource from which we imported has been deleted, report in it status. + // It may be a temporary situation, for example, if the deployment from which the Integration is imported + // is being redeployed. For this reason we should keep the Integration instead of forcefully removing it. + message := fmt.Sprintf( + "import %s %s no longer available", + it.Annotations[v1.IntegrationImportedKindLabel], + it.Annotations[v1.IntegrationImportedNameLabel], + ) + it.SetReadyConditionError(message) + zero := int32(0) + it.Status.Phase = v1.IntegrationPhaseImportMissing + it.Status.Replicas = &zero + if err = updateSyntheticIntegration(ctx, c, it); err != nil { + log.Errorf(err, "Some error happened while updating a synthetic Integration %s", it.Name) + } + log.Infof("Updated synthetic Integration %s with status %s", it.GetName(), it.Status.Phase) + } else { + log.Errorf(err, "Some error happened while loading object %s from the cluster", ctrlObj.GetName()) + return + } + } else { + // Importing label removed + if err = deleteSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName); err != nil { + log.Errorf(err, "Some error happened while deleting a synthetic Integration %s", integrationName) + } + log.Infof("Deleted synthetic Integration %s", integrationName) + } + } + }, + }) + } + + return nil +} + +func getInformers(ctx context.Context, cl client.Client, c cache.Cache) ([]cache.Informer, error) { + deploy, err := c.GetInformer(ctx, &appsv1.Deployment{}) + if err != nil { + return nil, err + } + informers := []cache.Informer{deploy} + // Watch for the CronJob conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(cl, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + cron, err := c.GetInformer(ctx, &batchv1.CronJob{}) + if err != nil { + return nil, err + } + informers = append(informers, cron) + } + // Watch for the Knative Services conditionally + if ok, err := kubernetes.IsAPIResourceInstalled(cl, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); ok && err == nil { + if ok, err := kubernetes.CheckPermission(ctx, cl, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); ok && err == nil { + ksvc, err := c.GetInformer(ctx, &servingv1.Service{}) + if err != nil { + return nil, err + } + informers = append(informers, ksvc) + } + } + + return informers, nil +} + +func getSyntheticIntegration(ctx context.Context, c client.Client, namespace, name string) (*v1.Integration, error) { + it := v1.NewIntegration(namespace, name) + err := c.Get(ctx, ctrl.ObjectKeyFromObject(&it), &it) + return &it, err +} + +func createSyntheticIntegration(ctx context.Context, c client.Client, it *v1.Integration) error { + return c.Create(ctx, it, ctrl.FieldOwner("camel-k-operator")) +} + +func deleteSyntheticIntegration(ctx context.Context, c client.Client, namespace, name string) error { + // As the Integration label was removed, we don't know which is the Synthetic integration to remove + it := v1.NewIntegration(namespace, name) + return c.Delete(ctx, &it) +} + +func updateSyntheticIntegration(ctx context.Context, c client.Client, it *v1.Integration) error { + return c.Status().Update(ctx, it, ctrl.FieldOwner("camel-k-operator")) +} + +// isManagedObject returns true if the object is managed by an Integration. +func isManagedObject(obj ctrl.Object) bool { + for _, mr := range obj.GetOwnerReferences() { + if mr.APIVersion == "camel.apache.org/v1" && + mr.Kind == "Integration" { + return true + } + } + return false +} + +// hasIntegrationLabel returns true if an Integration label exists on the object. +func hasIntegrationLabel(obj ctrl.Object) bool { + return obj.GetLabels()[v1.IntegrationLabel] != "" +} + +// nonManagedCamelApplicationAdapter represents a Camel application built and deployed outside the operator lifecycle. +type nonManagedCamelApplicationAdapter interface { + // Integration return an Integration resource fed by the Camel application adapter. + Integration() *v1.Integration +} + +func nonManagedCamelApplicationFactory(obj ctrl.Object) (nonManagedCamelApplicationAdapter, error) { + deploy, ok := obj.(*appsv1.Deployment) + if ok { + return &nonManagedCamelDeployment{deploy: deploy}, nil + } + cronjob, ok := obj.(*batchv1.CronJob) + if ok { + return &NonManagedCamelCronjob{cron: cronjob}, nil + } + ksvc, ok := obj.(*servingv1.Service) + if ok { + return &NonManagedCamelKnativeService{ksvc: ksvc}, nil + } + return nil, fmt.Errorf("Unsupported %s object kind", obj) +} + +// NonManagedCamelDeployment represents a regular Camel application built and deployed outside the operator lifecycle. +type nonManagedCamelDeployment struct { + deploy *appsv1.Deployment +} + +// Integration return an Integration resource fed by the Camel application adapter. +func (app *nonManagedCamelDeployment) Integration() *v1.Integration { + it := v1.NewIntegration(app.deploy.Namespace, app.deploy.Labels[v1.IntegrationLabel]) + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: app.deploy.Name, + v1.IntegrationImportedKindLabel: "Deployment", + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{ + Container: &trait.ContainerTrait{ + Name: app.getContainerNameFromDeployment(), + }, + }, + } + return &it +} + +// getContainerNameFromDeployment returns the container name which is running the Camel application. +func (app *nonManagedCamelDeployment) getContainerNameFromDeployment() string { + firstContainerName := "" + for _, ct := range app.deploy.Spec.Template.Spec.Containers { + // set as fallback if no container is named as the deployment + if firstContainerName == "" { + firstContainerName = app.deploy.Name + } + if ct.Name == app.deploy.Name { + return app.deploy.Name + } + } + return firstContainerName +} + +// NonManagedCamelCronjob represents a cron Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelCronjob struct { + cron *batchv1.CronJob +} + +// Integration return an Integration resource fed by the Camel application adapter. +func (app *NonManagedCamelCronjob) Integration() *v1.Integration { + it := v1.NewIntegration(app.cron.Namespace, app.cron.Labels[v1.IntegrationLabel]) + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: app.cron.Name, + v1.IntegrationImportedKindLabel: "CronJob", + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{}, + } + return &it +} + +// NonManagedCamelKnativeService represents a Knative Service based Camel application built and deployed outside the operator lifecycle. +type NonManagedCamelKnativeService struct { + ksvc *servingv1.Service +} + +// Integration return an Integration resource fed by the Camel application adapter. +func (app *NonManagedCamelKnativeService) Integration() *v1.Integration { + it := v1.NewIntegration(app.ksvc.Namespace, app.ksvc.Labels[v1.IntegrationLabel]) + it.SetAnnotations(map[string]string{ + v1.IntegrationImportedNameLabel: app.ksvc.Name, + v1.IntegrationImportedKindLabel: "KnativeService", + v1.IntegrationSyntheticLabel: "true", + }) + it.Spec = v1.IntegrationSpec{ + Traits: v1.Traits{}, + } + return &it +}