Skip to content

Commit

Permalink
chore: synthetic Integration separate controller
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Dec 1, 2023
1 parent c62f02b commit c6e2675
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 346 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/controller"
"github.com/apache/camel-k/v2/pkg/controller/synthetic"
"github.com/apache/camel-k/v2/pkg/event"
"github.com/apache/camel-k/v2/pkg/install"
"github.com/apache/camel-k/v2/pkg/platform"
Expand Down Expand Up @@ -231,6 +232,8 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log)
exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform")

log.Info("Starting the synthetic Integration manager")
exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache(), mgr.GetAPIReader()), "synthetic Integration manager error")
log.Info("Starting the manager")
exitOnError(mgr.Start(ctx), "manager exited non-zero")
}
Expand Down
44 changes: 4 additions & 40 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,37 +405,13 @@ func watchIntegrationResources(c client.Client, b *builder.Builder) {
},
}
})).
// Watch for non managed Deployments (ie, imported)
Watches(&appsv1.Deployment{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
deploy, ok := a.(*appsv1.Deployment)
if !ok {
log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve Deployment")
return []reconcile.Request{}
}
return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelDeployment{deploy: deploy})
}),
builder.WithPredicates(NonManagedObjectPredicate{}),
).
// Watch for the owned Deployments
Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{}))
}

func watchCronJobResources(c client.Client, b *builder.Builder) {
// Watch for non managed Deployments (ie, imported)
b.Watches(&batchv1.CronJob{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
cron, ok := a.(*batchv1.CronJob)
if !ok {
log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve CronJob")
return []reconcile.Request{}
}
return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelCronjob{cron: cron})
}),
builder.WithPredicates(NonManagedObjectPredicate{}),
).
// Watch for the owned CronJobs
Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
// Watch for the owned CronJobs
b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{}))
}

func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Builder) error {
Expand All @@ -445,20 +421,8 @@ func watchKnativeResources(ctx context.Context, c client.Client, b *builder.Buil
if ok, err := kubernetes.CheckPermission(checkCtx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil {
return err
} else if ok {
// Watch for non managed Knative Service (ie, imported)
b.Watches(&servingv1.Service{},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a ctrl.Object) []reconcile.Request {
ksvc, ok := a.(*servingv1.Service)
if !ok {
log.Error(fmt.Errorf("type assertion failed: %v", a), "Failed to retrieve to retrieve KnativeService")
return []reconcile.Request{}
}
return nonManagedCamelAppEnqueueRequestsFromMapFunc(ctx, c, &NonManagedCamelKnativeService{ksvc: ksvc})
}),
builder.WithPredicates(NonManagedObjectPredicate{}),
).
// Watch for the owned CronJobs
Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
// Watch for the owned Knative Services
b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{}))
}
return nil
}
Expand Down
249 changes: 0 additions & 249 deletions pkg/controller/integration/integration_controller_import.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (action *monitorAction) CanHandle(integration *v1.Integration) bool {
return integration.Status.Phase == v1.IntegrationPhaseDeploying ||
integration.Status.Phase == v1.IntegrationPhaseRunning ||
integration.Status.Phase == v1.IntegrationPhaseError ||
integration.Status.Phase == v1.IntegrationPhaseImportMissing ||
integration.Status.Phase == v1.IntegrationPhaseCannotMonitor
}

Expand Down
18 changes: 0 additions & 18 deletions pkg/controller/integration/monitor_synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package integration

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"

v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/trait"
Expand All @@ -44,22 +42,6 @@ func (action *monitorSyntheticAction) Name() string {
func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
environment, err := trait.NewSyntheticEnvironment(ctx, action.client, integration, nil)
if err != nil {
if k8serrors.IsNotFound(err) {
// Not an error: the resource from which we imported has been deleted, report in it status.
// It may be a temporary situation, for example, if the deployment from which the Integration is imported
// is being redeployed. For this reason we should keep the Integration instead of forcefully removing it.
message := fmt.Sprintf(
"import %s %s no longer available",
integration.Annotations[v1.IntegrationImportedKindLabel],
integration.Annotations[v1.IntegrationImportedNameLabel],
)
action.L.Info(message)
integration.SetReadyConditionError(message)
zero := int32(0)
integration.Status.Phase = v1.IntegrationPhaseImportMissing
integration.Status.Replicas = &zero
return integration, nil
}
// report the error
integration.Status.Phase = v1.IntegrationPhaseError
integration.SetReadyCondition(corev1.ConditionFalse, v1.IntegrationConditionImportingKindAvailableReason, err.Error())
Expand Down
Loading

0 comments on commit c6e2675

Please sign in to comment.