Skip to content

Commit

Permalink
chore: syntetic Integration ownership
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Jan 3, 2024
1 parent 8951504 commit 8910f11
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 81 deletions.
6 changes: 3 additions & 3 deletions docs/modules/ROOT/pages/running/import.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ The operator immediately creates a synthetic Integration:
```
$ kubectl get it
NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS
test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Cannot Monitor Pods
test-79c385c3-d58e-4c28-826d-b14b6245f908 my-it Running
```
You can see it will be in `Cannot Monitor Pods` status phase. This is expected because the way Camel K operator monitor Pods. It requires that the same label applied to the Deployment is inherited by the generated Pods. For this reason, beside labelling the Deployment, we need to add a label in the Deployment template.
You can see it will be in `Running` status phase. However, checking the conditions you will be able to see that the Integration is not yet able to be fully monitored. This is expected because the way Camel K operator monitor Pods. It requires that the same label applied to the Deployment is inherited by the generated Pods. For this reason, beside labelling the Deployment, we need to add a label in the Deployment template.
```
$ kubectl patch deployment my-camel-sb-svc --patch '{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}'
```
Also this operator can be performed manually or automated in the deployment procedure. We can see now that the operator will be able to monitor accordingly the status of the Pods:
Also this operation can be performed manually or automated in the deployment procedure. We can see now that the operator will be able to monitor accordingly the status of the Pods:
```
$ kubectl get it
NAMESPACE NAME PHASE RUNTIME PROVIDER RUNTIME VERSION KIT REPLICAS
Expand Down
10 changes: 4 additions & 6 deletions e2e/commonwithcustominstall/synthetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func TestSyntheticIntegrationFromDeployment(t *testing.T) {

// Label the deployment --> Verify the Integration is created (cannot still monitor)
ExpectExecSucceed(t, Kubectl("label", "deploy", "my-camel-sb-svc", "camel.apache.org/integration=my-it", "-n", ns))
Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseCannotMonitor))
Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseRunning))
Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue))
Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionMonitoringPodsAvailable), TestTimeoutShort).Should(Equal(corev1.ConditionFalse))

// Label the deployment template --> Verify the Integration is monitored
ExpectExecSucceed(t, Kubectl("patch", "deployment", "my-camel-sb-svc", "--patch", `{"spec": {"template": {"metadata": {"labels": {"camel.apache.org/integration": "my-it"}}}}}`, "-n", ns))
Expand All @@ -63,12 +64,9 @@ func TestSyntheticIntegrationFromDeployment(t *testing.T) {
one := int32(1)
Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&one))

// Delete the deployment --> Verify the Integration is in missing status
// Delete the deployment --> Verify the Integration is eventually garbage collected
ExpectExecSucceed(t, Kubectl("delete", "deploy", "my-camel-sb-svc", "-n", ns))
Eventually(IntegrationPhase(ns, "my-it"), TestTimeoutShort).Should(Equal(v1.IntegrationPhaseImportMissing))
Eventually(IntegrationConditionStatus(ns, "my-it", v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionFalse))
zero := int32(0)
Eventually(IntegrationStatusReplicas(ns, "my-it"), TestTimeoutShort).Should(Equal(&zero))
Eventually(Integration(ns, "my-it"), TestTimeoutShort).Should(BeNil())

// Recreate the deployment and label --> Verify the Integration is monitored
ExpectExecSucceed(t, Kubectl("apply", "-f", "files/deploy.yaml", "-n", ns))
Expand Down
4 changes: 0 additions & 4 deletions pkg/apis/camel/v1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ const (
IntegrationPhaseRunning IntegrationPhase = "Running"
// IntegrationPhaseError --.
IntegrationPhaseError IntegrationPhase = "Error"
// IntegrationPhaseImportMissing used when the application from which the Integration is imported has been deleted.
IntegrationPhaseImportMissing IntegrationPhase = "Application Missing"
// IntegrationPhaseCannotMonitor used when the application from which the Integration has not enough information to monitor its pods.
IntegrationPhaseCannotMonitor IntegrationPhase = "Cannot Monitor Pods"

// IntegrationConditionReady --.
IntegrationConditionReady IntegrationConditionType = "Ready"
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID
synthEnvVal, synth := os.LookupEnv("CAMEL_K_SYNTHETIC_INTEGRATIONS")
if synth && synthEnvVal == "true" {
log.Info("Starting the synthetic Integration manager")
exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache(), mgr.GetAPIReader()), "synthetic Integration manager error")
exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()), "synthetic Integration manager error")
} else {
log.Info("Synthetic Integration manager not configured, skipping")
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/integration/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func (action *monitorAction) Name() string {
func (action *monitorAction) CanHandle(integration *v1.Integration) bool {
return integration.Status.Phase == v1.IntegrationPhaseDeploying ||
integration.Status.Phase == v1.IntegrationPhaseRunning ||
integration.Status.Phase == v1.IntegrationPhaseError ||
integration.Status.Phase == v1.IntegrationPhaseCannotMonitor
integration.Status.Phase == v1.IntegrationPhaseError
}

func (action *monitorAction) Handle(ctx context.Context, integration *v1.Integration) (*v1.Integration, error) {
Expand Down Expand Up @@ -142,7 +141,6 @@ func (action *monitorAction) monitorPods(ctx context.Context, environment *trait
if !controller.hasTemplateIntegrationLabel() {
// This is happening when the Deployment, CronJob, etc resources
// miss the Integration label, required to identify sibling Pods.
integration.Status.Phase = v1.IntegrationPhaseCannotMonitor
integration.Status.SetConditions(
v1.IntegrationCondition{
Type: v1.IntegrationConditionMonitoringPodsAvailable,
Expand Down
15 changes: 2 additions & 13 deletions pkg/controller/integration/monitor_synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package integration

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"

Expand All @@ -46,18 +45,8 @@ func (action *monitorSyntheticAction) Handle(ctx context.Context, integration *v
if err != nil {
// Importing application no longer available
if k8serrors.IsNotFound(err) {
// It could be a normal condition, don't report as an error
integration.Status.Phase = v1.IntegrationPhaseImportMissing
message := fmt.Sprintf(
"import %s %s no longer available",
integration.Annotations[v1.IntegrationImportedKindLabel],
integration.Annotations[v1.IntegrationImportedNameLabel],
)
integration.SetReadyConditionError(message)
zero := int32(0)
integration.Status.Phase = v1.IntegrationPhaseImportMissing
integration.Status.Replicas = &zero
return integration, nil
// Application was deleted. The GC will take care of
return nil, nil
}
// other reasons, likely some error to report
integration.Status.Phase = v1.IntegrationPhaseError
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/integration/monitor_synthetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func TestMonitorSyntheticIntegrationCannotMonitorPods(t *testing.T) {
assert.True(t, a.CanHandle(importedIt))
handledIt, err := a.Handle(context.TODO(), importedIt)
assert.Nil(t, err)
assert.Equal(t, v1.IntegrationPhaseCannotMonitor, handledIt.Status.Phase)
// Ready condition should be still true
assert.Equal(t, corev1.ConditionTrue, handledIt.Status.GetCondition(v1.IntegrationConditionReady).Status)
// Check monitoring pods condition
Expand Down
94 changes: 46 additions & 48 deletions pkg/controller/synthetic/synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,25 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientgocache "k8s.io/client-go/tools/cache"
"knative.dev/serving/pkg/apis/serving"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/cache"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
controller = true
blockOwnerDeletion = true
)

// ManageSyntheticIntegrations is the controller for synthetic Integrations. Consider that the lifecycle of the objects are driven
// by the way we are monitoring them. Since we're filtering by `camel.apache.org/integration` label in the cached clinet,
// by the way we are monitoring them. Since we're filtering by `camel.apache.org/integration` label in the cached client,
// you must consider an add, update or delete
// accordingly, ie, when the user label the resource, then it is considered as an add, when it removes the label, it is considered as a delete.
// We must filter only non managed objects in order to avoid to conflict with the reconciliation loop of managed objects (owned by an Integration).
func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cache.Cache, reader ctrl.Reader) error {
func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cache.Cache) error {
informers, err := getInformers(ctx, c, cache)
if err != nil {
return err
Expand Down Expand Up @@ -73,15 +79,7 @@ func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cac
log.Errorf(err, "Some error happened while loading a synthetic Integration %s", integrationName)
}
} else {
if it.Status.Phase == v1.IntegrationPhaseImportMissing {
// Update with proper phase (reconciliation will take care)
it.Status.Phase = v1.IntegrationPhaseNone
if err = updateSyntheticIntegration(ctx, c, it); err != nil {
log.Errorf(err, "Some error happened while updatinf a synthetic Integration %s", integrationName)
}
} else {
log.Infof("Synthetic Integration %s is in phase %s. Skipping.", integrationName, it.Status.Phase)
}
log.Infof("Synthetic Integration %s is in phase %s. Skipping.", integrationName, it.Status.Phase)
}
}
},
Expand All @@ -93,44 +91,11 @@ func ManageSyntheticIntegrations(ctx context.Context, c client.Client, cache cac
}
if !isManagedObject(ctrlObj) {
integrationName := ctrlObj.GetLabels()[v1.IntegrationLabel]
// We must use a non caching client to understand if the object has been deleted from the cluster or only deleted from
// the cache (ie, user removed the importing label)
err := reader.Get(ctx, ctrl.ObjectKeyFromObject(ctrlObj), ctrlObj)
if err != nil {
if k8serrors.IsNotFound(err) {
// Object removed from the cluster
it, err := getSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName)
if err != nil {
log.Errorf(err, "Some error happened while loading a synthetic Integration %s", it.Name)
return
}
// The resource from which we imported has been deleted, report in it status.
// It may be a temporary situation, for example, if the deployment from which the Integration is imported
// is being redeployed. For this reason we should keep the Integration instead of forcefully removing it.
message := fmt.Sprintf(
"import %s %s no longer available",
it.Annotations[v1.IntegrationImportedKindLabel],
it.Annotations[v1.IntegrationImportedNameLabel],
)
it.SetReadyConditionError(message)
zero := int32(0)
it.Status.Phase = v1.IntegrationPhaseImportMissing
it.Status.Replicas = &zero
if err = updateSyntheticIntegration(ctx, c, it); err != nil {
log.Errorf(err, "Some error happened while updating a synthetic Integration %s", it.Name)
}
log.Infof("Updated synthetic Integration %s with status %s", it.GetName(), it.Status.Phase)
} else {
log.Errorf(err, "Some error happened while loading object %s from the cluster", ctrlObj.GetName())
return
}
} else {
// Importing label removed
if err = deleteSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName); err != nil {
log.Errorf(err, "Some error happened while deleting a synthetic Integration %s", integrationName)
}
log.Infof("Deleted synthetic Integration %s", integrationName)
// Importing label removed
if err = deleteSyntheticIntegration(ctx, c, ctrlObj.GetNamespace(), integrationName); err != nil {
log.Errorf(err, "Some error happened while deleting a synthetic Integration %s", integrationName)
}
log.Infof("Deleted synthetic Integration %s", integrationName)
}
},
})
Expand Down Expand Up @@ -243,6 +208,17 @@ func (app *nonManagedCamelDeployment) Integration() *v1.Integration {
},
},
}
references := []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: app.deploy.Name,
UID: app.deploy.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
}
it.SetOwnerReferences(references)
return &it
}

Expand Down Expand Up @@ -277,6 +253,17 @@ func (app *NonManagedCamelCronjob) Integration() *v1.Integration {
it.Spec = v1.IntegrationSpec{
Traits: v1.Traits{},
}
references := []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Kind: "CronJob",
Name: app.cron.Name,
UID: app.cron.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
}
it.SetOwnerReferences(references)
return &it
}

Expand All @@ -296,5 +283,16 @@ func (app *NonManagedCamelKnativeService) Integration() *v1.Integration {
it.Spec = v1.IntegrationSpec{
Traits: v1.Traits{},
}
references := []metav1.OwnerReference{
{
APIVersion: servingv1.SchemeGroupVersion.String(),
Kind: "Service",
Name: app.ksvc.Name,
UID: app.ksvc.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
}
it.SetOwnerReferences(references)
return &it
}
36 changes: 34 additions & 2 deletions pkg/controller/synthetic/synthetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ func TestNonManagedDeployment(t *testing.T) {
},
},
}
references := []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: deploy.Name,
UID: deploy.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
}
expectedIt.SetOwnerReferences(references)

deploymentAdapter, err := nonManagedCamelApplicationFactory(deploy)
assert.Nil(t, err)
Expand Down Expand Up @@ -164,7 +175,17 @@ func TestNonManagedCronJob(t *testing.T) {
v1.IntegrationImportedKindLabel: "CronJob",
v1.IntegrationSyntheticLabel: "true",
})

references := []metav1.OwnerReference{
{
APIVersion: "batch/v1",
Kind: "CronJob",
Name: cron.Name,
UID: cron.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
}
expectedIt.SetOwnerReferences(references)
cronJobAdapter, err := nonManagedCamelApplicationFactory(cron)
assert.Nil(t, err)
assert.NotNil(t, cronJobAdapter)
Expand All @@ -174,7 +195,7 @@ func TestNonManagedCronJob(t *testing.T) {
func TestNonManagedKnativeService(t *testing.T) {
ksvc := &servingv1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: appsv1.SchemeGroupVersion.String(),
APIVersion: servingv1.SchemeGroupVersion.String(),
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -213,6 +234,17 @@ func TestNonManagedKnativeService(t *testing.T) {
v1.IntegrationImportedKindLabel: "KnativeService",
v1.IntegrationSyntheticLabel: "true",
})
references := []metav1.OwnerReference{
{
APIVersion: servingv1.SchemeGroupVersion.String(),
Kind: "Service",
Name: ksvc.Name,
UID: ksvc.UID,
Controller: &controller,
BlockOwnerDeletion: &blockOwnerDeletion,
},
}
expectedIt.SetOwnerReferences(references)

knativeServiceAdapter, err := nonManagedCamelApplicationFactory(ksvc)
assert.Nil(t, err)
Expand Down

0 comments on commit 8910f11

Please sign in to comment.