diff --git a/pkg/controller/integration/platform_setup.go b/pkg/controller/integration/platform_setup.go index 2c578276de..e1851f7696 100644 --- a/pkg/controller/integration/platform_setup.go +++ b/pkg/controller/integration/platform_setup.go @@ -97,7 +97,7 @@ func determineBestTraitProfile(c client.Client, integration *v1.Integration, p * // Use platform spec profile if set return p.Spec.Profile, nil } - if ok, err := knative.IsServingInstalled(c); err != nil { + if ok, err := knative.IsInstalled(c); err != nil { return "", err } else if ok { return v1.TraitProfileKnative, nil diff --git a/pkg/controller/kameletbinding/integration.go b/pkg/controller/kameletbinding/integration.go index 1f46302777..b28780de88 100644 --- a/pkg/controller/kameletbinding/integration.go +++ b/pkg/controller/kameletbinding/integration.go @@ -248,7 +248,7 @@ func determineTraitProfile(ctx context.Context, c client.Client, binding *v1alph return pl.Spec.Profile, nil } } - if ok, err := knative.IsServingInstalled(c); err != nil { + if ok, err := knative.IsInstalled(c); err != nil { return "", err } else if ok { return v1.TraitProfileKnative, nil diff --git a/pkg/controller/pipe/integration.go b/pkg/controller/pipe/integration.go index 80b06d662a..aa2a9c4be4 100644 --- a/pkg/controller/pipe/integration.go +++ b/pkg/controller/pipe/integration.go @@ -248,7 +248,7 @@ func determineTraitProfile(ctx context.Context, c client.Client, binding *v1.Pip return pl.Spec.Profile, nil } } - if ok, err := knative.IsServingInstalled(c); err != nil { + if ok, err := knative.IsInstalled(c); err != nil { return "", err } else if ok { return v1.TraitProfileKnative, nil diff --git a/pkg/trait/gc.go b/pkg/trait/gc.go index e3e5e79e85..2670de3fd6 100644 --- a/pkg/trait/gc.go +++ b/pkg/trait/gc.go @@ -47,6 +47,7 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/util" + "github.com/apache/camel-k/v2/pkg/util/knative" "github.com/apache/camel-k/v2/pkg/util/log" ) @@ -86,20 +87,6 @@ var ( Version: batchv1.SchemeGroupVersion.Version, }: {}, } - deletableTypesByProfile = map[v1.TraitProfile]map[schema.GroupVersionKind]struct{}{ - v1.TraitProfileKnative: { - schema.GroupVersionKind{ - Kind: "Service", - Group: "serving.knative.dev", - Version: "v1", - }: {}, - schema.GroupVersionKind{ - Kind: "Trigger", - Group: "eventing.knative.dev", - Version: "v1", - }: {}, - }, - } ) type gcTrait struct { @@ -160,12 +147,30 @@ func (t *gcTrait) garbageCollectResources(e *Environment) error { } profile := e.DetermineProfile() - if profileDeletableTypes, ok := deletableTypesByProfile[profile]; ok { - // copy profile related deletable types if not already present - for key, value := range profileDeletableTypes { - if _, found := deletableGVKs[key]; !found { - deletableGVKs[key] = value - } + deletableTypesByProfile := map[schema.GroupVersionKind]struct{}{} + + if profile == v1.TraitProfileKnative { + if ok, _ := knative.IsServingInstalled(e.Client); ok { + deletableTypesByProfile[schema.GroupVersionKind{ + Kind: "Service", + Group: "serving.knative.dev", + Version: "v1", + }] = struct{}{} + } + + if ok, _ := knative.IsEventingInstalled(e.Client); ok { + deletableTypesByProfile[schema.GroupVersionKind{ + Kind: "Trigger", + Group: "eventing.knative.dev", + Version: "v1", + }] = struct{}{} + } + } + + // copy profile related deletable types if not already present + for key, value := range deletableTypesByProfile { + if _, found := deletableGVKs[key]; !found { + deletableGVKs[key] = value } } diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index 3328ec6501..a6a032a9da 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -331,7 +331,9 @@ func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvi serviceName = "default" } servicePath := fmt.Sprintf("/events/%s", eventType) - t.createTrigger(e, ref, eventType, servicePath) + if triggerErr := t.createTrigger(e, ref, eventType, servicePath); triggerErr != nil { + return triggerErr + } if !env.ContainsService(serviceName, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, ref.APIVersion, ref.Kind) { svc := knativeapi.CamelServiceDefinition{ @@ -475,7 +477,7 @@ func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.Came return err } -func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string, path string) { +func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string, path string) error { // TODO extend to additional filters too, to filter them at source and not at destination found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool { return trigger.Spec.Broker == ref.Name && @@ -486,9 +488,32 @@ func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference if ref.Namespace == "" { ref.Namespace = e.Integration.Namespace } - trigger := knativeutil.CreateTrigger(*ref, e.Integration.Name, eventType, path) + + controllerStrategy, err := e.DetermineControllerStrategy() + if err != nil { + return err + } + + var trigger *eventing.Trigger + switch controllerStrategy { + case ControllerStrategyKnativeService: + trigger, err = knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType, path) + if err != nil { + return err + } + case ControllerStrategyDeployment: + trigger, err = knativeutil.CreateServiceTrigger(*ref, e.Integration.Name, eventType, path) + if err != nil { + return err + } + default: + return fmt.Errorf("failed to create Knative trigger: unsupported controller strategy %s", controllerStrategy) + } + e.Resources.Add(trigger) } + + return nil } func (t *knativeTrait) ifServiceMissingDo( diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go index a8e53a779e..0fee5f9da7 100644 --- a/pkg/trait/knative_service.go +++ b/pkg/trait/knative_service.go @@ -30,6 +30,7 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/metadata" + "github.com/apache/camel-k/v2/pkg/util/knative" "github.com/apache/camel-k/v2/pkg/util/kubernetes" ) @@ -149,6 +150,11 @@ func (t *knativeServiceTrait) SelectControllerStrategy(e *Environment) (*Control return nil, nil } + // Knative serving is required + if ok, _ := knative.IsServingInstalled(e.Client); !ok { + return nil, nil + } + var sources []v1.SourceSpec var err error if sources, err = kubernetes.ResolveIntegrationSources(e.Ctx, t.Client, e.Integration, e.Resources); err != nil { diff --git a/pkg/trait/knative_service_test.go b/pkg/trait/knative_service_test.go index a4e79f7dce..658b3101e7 100644 --- a/pkg/trait/knative_service_test.go +++ b/pkg/trait/knative_service_test.go @@ -414,6 +414,81 @@ func TestKnativeServiceNotApplicable(t *testing.T) { })) } +func TestKnativeServiceNoServingAvailable(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + client, _ := test.NewFakeClient() + fakeClient := client.(*test.FakeClient) //nolint + fakeClient.DisableKnativeServing() + + traitCatalog := NewCatalog(nil) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: client, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: KnativeServiceTestName, + Namespace: KnativeServiceTestNamespace, + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "routes.js", + Content: `from("platform-http:test").log("hello")`, + }, + Language: v1.LanguageJavaScript, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: kubernetes.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.Nil(t, environment.GetTrait("knative-service")) + + assert.Nil(t, environment.Resources.GetKnativeService(func(service *serving.Service) bool { + return service.Name == KnativeServiceTestName + })) + + assert.NotNil(t, environment.Resources.GetDeployment(func(deployment *appsv1.Deployment) bool { + return deployment.Name == KnativeServiceTestName + })) +} + func TestKnativeServiceWithRollout(t *testing.T) { environment := createKnativeServiceTestEnvironment(t, &traitv1.KnativeServiceTrait{RolloutDuration: "60s"}) assert.NotEmpty(t, environment.ExecutedTraits) diff --git a/pkg/trait/knative_test.go b/pkg/trait/knative_test.go index 75d0756209..454331ecfe 100644 --- a/pkg/trait/knative_test.go +++ b/pkg/trait/knative_test.go @@ -147,6 +147,14 @@ func TestKnativeEnvConfigurationFromTrait(t *testing.T) { eEventSink := ne.FindService("default", knativeapi.CamelEndpointKindSink, knativeapi.CamelServiceTypeEvent, "eventing.knative.dev/v1", "Broker") assert.NotNil(t, eEventSink) assert.Equal(t, "http://broker-default.host/", eEventSink.URL) + + assert.NotNil(t, environment.Resources.GetKnativeSubscription(func(subscription *messaging.Subscription) bool { + return assert.Equal(t, "channel-source-1-test", subscription.Name) + })) + + assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return assert.Equal(t, "default-test", trigger.Name) + })) } func TestKnativeEnvConfigurationFromSource(t *testing.T) { @@ -253,6 +261,203 @@ func TestKnativeEnvConfigurationFromSource(t *testing.T) { broker := ne.FindService("evt.type", knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, "", "") assert.NotNil(t, broker) assert.Equal(t, "false", broker.Metadata[knativeapi.CamelMetaKnativeReply]) + + assert.NotNil(t, environment.Resources.GetKnativeSubscription(func(subscription *messaging.Subscription) bool { + return assert.Equal(t, "channel-source-1-test", subscription.Name) + })) + + assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return assert.Equal(t, "default-test-evttype", trigger.Name) + })) +} + +func TestKnativeTriggerConfiguration(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + matching := true + + matching = matching && assert.Equal(t, "default", trigger.Spec.Broker) + matching = matching && assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + matching = matching && assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + matching = matching && assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + matching = matching && assert.Equal(t, "default-test-evttype", trigger.Name) + + return matching + })) +} + +func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + fakeClient := c.(*test.FakeClient) //nolint + fakeClient.DisableKnativeServing() + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + matching := true + + matching = matching && assert.Equal(t, "default", trigger.Spec.Broker) + matching = matching && assert.Equal(t, "v1", trigger.Spec.Subscriber.Ref.APIVersion) + matching = matching && assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + matching = matching && assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + matching = matching && assert.Equal(t, "default-test-evttype", trigger.Name) + + return matching + })) } func TestKnativePlatformHttpConfig(t *testing.T) { diff --git a/pkg/util/knative/enabled.go b/pkg/util/knative/enabled.go index 0a6b6ced2c..3cb503d5f0 100644 --- a/pkg/util/knative/enabled.go +++ b/pkg/util/knative/enabled.go @@ -28,7 +28,7 @@ import ( // IsRefKindInstalled returns true if the cluster has the referenced Kind installed. func IsRefKindInstalled(c kubernetes.Interface, ref corev1.ObjectReference) (bool, error) { - if installed, err := isInstalled(c, ref.GroupVersionKind().GroupVersion()); err != nil { + if installed, err := isServerResourceAvailable(c, ref.GroupVersionKind().GroupVersion()); err != nil { return false, err } else if installed { return true, nil @@ -36,6 +36,19 @@ func IsRefKindInstalled(c kubernetes.Interface, ref corev1.ObjectReference) (boo return false, nil } +// IsInstalled returns true if we are connected to a cluster with either Knative Serving or Eventing installed. +func IsInstalled(c kubernetes.Interface) (bool, error) { + if ok, err := IsServingInstalled(c); ok { + return ok, err + } else if ok, err = IsEventingInstalled(c); ok { + return ok, err + } else if err != nil { + return false, err + } + + return false, nil +} + // IsServingInstalled returns true if we are connected to a cluster with Knative Serving installed. func IsServingInstalled(c kubernetes.Interface) (bool, error) { return IsRefKindInstalled(c, corev1.ObjectReference{ @@ -52,7 +65,7 @@ func IsEventingInstalled(c kubernetes.Interface) (bool, error) { }) } -func isInstalled(c kubernetes.Interface, api schema.GroupVersion) (bool, error) { +func isServerResourceAvailable(c kubernetes.Interface, api schema.GroupVersion) (bool, error) { _, err := c.Discovery().ServerResourcesForGroupVersion(api.String()) if err != nil && (k8serrors.IsNotFound(err) || util.IsUnknownAPIError(err)) { return false, nil diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go index 1d5b7462a9..b72ae10245 100644 --- a/pkg/util/knative/knative.go +++ b/pkg/util/knative/knative.go @@ -75,7 +75,27 @@ func CreateSubscription(channelReference corev1.ObjectReference, serviceName str } } -func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) *eventing.Trigger { +// CreateServiceTrigger create Knative trigger with arbitrary Kubernetes Service as a subscriber - usually used when no Knative Serving is available on the cluster. +func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) (*eventing.Trigger, error) { + subscriberRef := duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Name: serviceName, + } + return CreateTrigger(brokerReference, subscriberRef, eventType, path) +} + +// CreateKnativeServiceTrigger create Knative trigger with Knative Serving Service as a subscriber - default option when Knative Serving is available on the cluster. +func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) (*eventing.Trigger, error) { + subscriberRef := duckv1.KReference{ + APIVersion: serving.SchemeGroupVersion.String(), + Kind: "Service", + Name: serviceName, + } + return CreateTrigger(brokerReference, subscriberRef, eventType, path) +} + +func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1.KReference, eventType string, path string) (*eventing.Trigger, error) { nameSuffix := "" var attributes map[string]string if eventType != "" { @@ -91,7 +111,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e }, ObjectMeta: metav1.ObjectMeta{ Namespace: brokerReference.Namespace, - Name: brokerReference.Name + "-" + serviceName + nameSuffix, + Name: brokerReference.Name + "-" + subscriberRef.Name + nameSuffix, }, Spec: eventing.TriggerSpec{ Filter: &eventing.TriggerFilter{ @@ -99,17 +119,13 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e }, Broker: brokerReference.Name, Subscriber: duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: serving.SchemeGroupVersion.String(), - Kind: "Service", - Name: serviceName, - }, + Ref: &subscriberRef, URI: &apis.URL{ Path: path, }, }, }, - } + }, nil } func CreateSinkBinding(source corev1.ObjectReference, target corev1.ObjectReference) *sources.SinkBinding { diff --git a/pkg/util/kubernetes/collection.go b/pkg/util/kubernetes/collection.go index dd3fcaf1a3..190d884d2a 100644 --- a/pkg/util/kubernetes/collection.go +++ b/pkg/util/kubernetes/collection.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + messaging "knative.dev/eventing/pkg/apis/messaging/v1" ctrl "sigs.k8s.io/controller-runtime/pkg/client" @@ -266,7 +267,7 @@ func (c *Collection) GetServiceForIntegration(integration *v1.Integration) *core }) } -// GetKnativeService returns a knative Service that matches the given function. +// GetKnativeService returns a Knative Service that matches the given function. func (c *Collection) GetKnativeService(filter func(*serving.Service) bool) *serving.Service { var retValue *serving.Service c.VisitKnativeService(func(re *serving.Service) { @@ -277,6 +278,28 @@ func (c *Collection) GetKnativeService(filter func(*serving.Service) bool) *serv return retValue } +// GetKnativeTrigger returns a Knative Trigger that matches the given function. +func (c *Collection) GetKnativeTrigger(filter func(*eventing.Trigger) bool) *eventing.Trigger { + var retValue *eventing.Trigger + c.VisitKnativeTrigger(func(re *eventing.Trigger) { + if filter(re) { + retValue = re + } + }) + return retValue +} + +// GetKnativeSubscription returns a Knative channel Subscription that matches the given function. +func (c *Collection) GetKnativeSubscription(filter func(subscription *messaging.Subscription) bool) *messaging.Subscription { + var retValue *messaging.Subscription + c.VisitKnativeSubscription(func(re *messaging.Subscription) { + if filter(re) { + retValue = re + } + }) + return retValue +} + // VisitRoute executes the visitor function on all Route resources. func (c *Collection) VisitRoute(visitor func(*routev1.Route)) { c.Visit(func(res runtime.Object) { @@ -357,6 +380,15 @@ func (c *Collection) VisitKnativeTrigger(visitor func(trigger *eventing.Trigger) }) } +// VisitKnativeSubscription executes the visitor function on all Knative channel Subscription resources. +func (c *Collection) VisitKnativeSubscription(visitor func(trigger *messaging.Subscription)) { + c.Visit(func(res runtime.Object) { + if conv, ok := res.(*messaging.Subscription); ok { + visitor(conv) + } + }) +} + // HasKnativeTrigger returns true if a Knative trigger respecting filter is found. func (c *Collection) HasKnativeTrigger(filter func(trigger *eventing.Trigger) bool) bool { var retValue *bool diff --git a/pkg/util/test/client.go b/pkg/util/test/client.go index a821bf81d4..b86976eb8b 100644 --- a/pkg/util/test/client.go +++ b/pkg/util/test/client.go @@ -113,10 +113,12 @@ func NewFakeClient(initObjs ...runtime.Object) (client.Client, error) { }) return &FakeClient{ - Client: c, - Interface: clientset, - camel: camelClientset, - scales: &fakescaleclient, + Client: c, + Interface: clientset, + camel: camelClientset, + scales: &fakescaleclient, + enabledKnativeServing: true, + enabledKnativeEventing: true, }, nil } @@ -138,10 +140,12 @@ func filterObjects(scheme *runtime.Scheme, input []runtime.Object, filter func(g type FakeClient struct { controller.Client kubernetes.Interface - camel *fakecamelclientset.Clientset - scales *fakescale.FakeScaleClient - disabledGroups []string - enabledOpenshift bool + camel *fakecamelclientset.Clientset + scales *fakescale.FakeScaleClient + disabledGroups []string + enabledOpenshift bool + enabledKnativeServing bool + enabledKnativeEventing bool } func (c *FakeClient) Intercept(intercept *interceptor.Funcs) { @@ -191,6 +195,14 @@ func (c *FakeClient) EnableOpenshiftDiscovery() { c.enabledOpenshift = true } +func (c *FakeClient) DisableKnativeServing() { + c.enabledKnativeServing = false +} + +func (c *FakeClient) DisableKnativeEventing() { + c.enabledKnativeEventing = false +} + func (c *FakeClient) AuthorizationV1() authorizationv1.AuthorizationV1Interface { return &FakeAuthorization{ AuthorizationV1Interface: c.Interface.AuthorizationV1(), @@ -201,9 +213,11 @@ func (c *FakeClient) AuthorizationV1() authorizationv1.AuthorizationV1Interface func (c *FakeClient) Discovery() discovery.DiscoveryInterface { return &FakeDiscovery{ - DiscoveryInterface: c.Interface.Discovery(), - disabledGroups: c.disabledGroups, - enabledOpenshift: c.enabledOpenshift, + DiscoveryInterface: c.Interface.Discovery(), + disabledGroups: c.disabledGroups, + enabledOpenshift: c.enabledOpenshift, + enabledKnativeServing: c.enabledKnativeServing, + enabledKnativeEventing: c.enabledKnativeEventing, } } @@ -229,8 +243,10 @@ func (f *FakeAuthorization) SelfSubjectRulesReviews() authorizationv1.SelfSubjec type FakeDiscovery struct { discovery.DiscoveryInterface - disabledGroups []string - enabledOpenshift bool + disabledGroups []string + enabledOpenshift bool + enabledKnativeServing bool + enabledKnativeEventing bool } func (f *FakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { @@ -247,26 +263,33 @@ func (f *FakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*me } } - // used to verify if knative is installed - if groupVersion == "serving.knative.dev/v1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { - return &metav1.APIResourceList{ - GroupVersion: "serving.knative.dev/v1", - }, nil - } - if groupVersion == "eventing.knative.dev/v1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { - return &metav1.APIResourceList{ - GroupVersion: "eventing.knative.dev/v1", - }, nil - } - if groupVersion == "messaging.knative.dev/v1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { - return &metav1.APIResourceList{ - GroupVersion: "messaging.knative.dev/v1", - }, nil + // used to verify if Knative Serving is installed + if f.enabledKnativeServing { + if groupVersion == "serving.knative.dev/v1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { + return &metav1.APIResourceList{ + GroupVersion: "serving.knative.dev/v1", + }, nil + } } - if groupVersion == "messaging.knative.dev/v1beta1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { - return &metav1.APIResourceList{ - GroupVersion: "messaging.knative.dev/v1beta1", - }, nil + + // used to verify if Knative Eventing is installed + if f.enabledKnativeEventing { + if groupVersion == "eventing.knative.dev/v1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { + return &metav1.APIResourceList{ + GroupVersion: "eventing.knative.dev/v1", + }, nil + } + if groupVersion == "messaging.knative.dev/v1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { + return &metav1.APIResourceList{ + GroupVersion: "messaging.knative.dev/v1", + }, nil + } + if groupVersion == "messaging.knative.dev/v1beta1" && !util.StringSliceExists(f.disabledGroups, groupVersion) { + return &metav1.APIResourceList{ + GroupVersion: "messaging.knative.dev/v1beta1", + }, nil + } } + return f.DiscoveryInterface.ServerResourcesForGroupVersion(groupVersion) }