Skip to content

Commit

Permalink
fix(#5402): Evaluate Knative profile based on Serving/Eventing installed
Browse files Browse the repository at this point in the history
- Use Knative profile when Serving or Eventing is installed on cluster
- Make sure to enable Knative trait when Serving or Eventing is installed
- Enable knative-service trait only when Knative Serving is installed
- Garbage collect Serving and Eventing resources in gc trait only when Serving/Eventing is installed on the cluster
- Do not use Serving service in Knative trigger when not installed on cluster
- Use arbitrary Service as a subscriber in Knative trigger when Serving is not available
  • Loading branch information
christophd committed Apr 25, 2024
1 parent e9568a6 commit 527f356
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/integration/platform_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func determineBestTraitProfile(c client.Client, integration *v1.Integration, p *
// Use platform spec profile if set
return p.Spec.Profile, nil
}
if ok, err := knative.IsServingInstalled(c); err != nil {
if ok, err := knative.IsInstalled(c); err != nil {
return "", err
} else if ok {
return v1.TraitProfileKnative, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/kameletbinding/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func determineTraitProfile(ctx context.Context, c client.Client, binding *v1alph
return pl.Spec.Profile, nil
}
}
if ok, err := knative.IsServingInstalled(c); err != nil {
if ok, err := knative.IsInstalled(c); err != nil {
return "", err
} else if ok {
return v1.TraitProfileKnative, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/pipe/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func determineTraitProfile(ctx context.Context, c client.Client, binding *v1.Pip
return pl.Spec.Profile, nil
}
}
if ok, err := knative.IsServingInstalled(c); err != nil {
if ok, err := knative.IsInstalled(c); err != nil {
return "", err
} else if ok {
return v1.TraitProfileKnative, nil
Expand Down
45 changes: 25 additions & 20 deletions pkg/trait/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
"github.com/apache/camel-k/v2/pkg/util"
"github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/log"
)

Expand Down Expand Up @@ -86,20 +87,6 @@ var (
Version: batchv1.SchemeGroupVersion.Version,
}: {},
}
deletableTypesByProfile = map[v1.TraitProfile]map[schema.GroupVersionKind]struct{}{
v1.TraitProfileKnative: {
schema.GroupVersionKind{
Kind: "Service",
Group: "serving.knative.dev",
Version: "v1",
}: {},
schema.GroupVersionKind{
Kind: "Trigger",
Group: "eventing.knative.dev",
Version: "v1",
}: {},
},
}
)

type gcTrait struct {
Expand Down Expand Up @@ -160,12 +147,30 @@ func (t *gcTrait) garbageCollectResources(e *Environment) error {
}

profile := e.DetermineProfile()
if profileDeletableTypes, ok := deletableTypesByProfile[profile]; ok {
// copy profile related deletable types if not already present
for key, value := range profileDeletableTypes {
if _, found := deletableGVKs[key]; !found {
deletableGVKs[key] = value
}
deletableTypesByProfile := map[schema.GroupVersionKind]struct{}{}

if profile == v1.TraitProfileKnative {
if ok, _ := knative.IsServingInstalled(e.Client); ok {
deletableTypesByProfile[schema.GroupVersionKind{
Kind: "Service",
Group: "serving.knative.dev",
Version: "v1",
}] = struct{}{}
}

if ok, _ := knative.IsEventingInstalled(e.Client); ok {
deletableTypesByProfile[schema.GroupVersionKind{
Kind: "Trigger",
Group: "eventing.knative.dev",
Version: "v1",
}] = struct{}{}
}
}

// copy profile related deletable types if not already present
for key, value := range deletableTypesByProfile {
if _, found := deletableGVKs[key]; !found {
deletableGVKs[key] = value
}
}

Expand Down
35 changes: 32 additions & 3 deletions pkg/trait/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package trait

import (
"errors"
"fmt"
"net/url"
"reflect"
Expand Down Expand Up @@ -331,7 +332,9 @@ func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvi
serviceName = "default"
}
servicePath := fmt.Sprintf("/events/%s", eventType)
t.createTrigger(e, ref, eventType, servicePath)
if triggerErr := t.createTrigger(e, ref, eventType, servicePath); triggerErr != nil {
return triggerErr
}

if !env.ContainsService(serviceName, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, ref.APIVersion, ref.Kind) {
svc := knativeapi.CamelServiceDefinition{
Expand Down Expand Up @@ -475,7 +478,7 @@ func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.Came
return err
}

func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string, path string) {
func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string, path string) error {
// TODO extend to additional filters too, to filter them at source and not at destination
found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool {
return trigger.Spec.Broker == ref.Name &&
Expand All @@ -486,9 +489,18 @@ func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference
if ref.Namespace == "" {
ref.Namespace = e.Integration.Namespace
}
trigger := knativeutil.CreateTrigger(*ref, e.Integration.Name, eventType, path)
serviceAPIVersion, err := t.determineServiceType(e)
if err != nil {
return err
}
trigger, err := knativeutil.CreateTrigger(*ref, e.Integration.Name, serviceAPIVersion, eventType, path)
if err != nil {
return err
}
e.Resources.Add(trigger)
}

return nil
}

func (t *knativeTrait) ifServiceMissingDo(
Expand Down Expand Up @@ -560,3 +572,20 @@ func (t *knativeTrait) extractServices(names []string, serviceType knativeapi.Ca
sort.Strings(answer)
return answer
}

func (t *knativeTrait) determineServiceType(e *Environment) (string, error) {
controllerStrategy, err := e.DetermineControllerStrategy()
if err != nil {
return "", err
}

switch controllerStrategy {
case ControllerStrategyKnativeService:
return serving.SchemeGroupVersion.String(), nil
case ControllerStrategyDeployment:
return "v1", nil
default:
return "", errors.New(fmt.Sprintf("failed to create Knative trigger: "+
"unsupported controller strategy %s", controllerStrategy))
}
}
6 changes: 6 additions & 0 deletions pkg/trait/knative_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
"github.com/apache/camel-k/v2/pkg/metadata"
"github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
)

Expand Down Expand Up @@ -149,6 +150,11 @@ func (t *knativeServiceTrait) SelectControllerStrategy(e *Environment) (*Control
return nil, nil
}

// Knative serving is required
if ok, _ := knative.IsServingInstalled(e.Client); !ok {
return nil, nil
}

var sources []v1.SourceSpec
var err error
if sources, err = kubernetes.ResolveIntegrationSources(e.Ctx, t.Client, e.Integration, e.Resources); err != nil {
Expand Down
17 changes: 15 additions & 2 deletions pkg/util/knative/enabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,27 @@ import (

// IsRefKindInstalled returns true if the cluster has the referenced Kind installed.
func IsRefKindInstalled(c kubernetes.Interface, ref corev1.ObjectReference) (bool, error) {
if installed, err := isInstalled(c, ref.GroupVersionKind().GroupVersion()); err != nil {
if installed, err := isServerResourceAvailable(c, ref.GroupVersionKind().GroupVersion()); err != nil {
return false, err
} else if installed {
return true, nil
}
return false, nil
}

// IsInstalled returns true if we are connected to a cluster with either Knative Serving or Eventing installed.
func IsInstalled(c kubernetes.Interface) (bool, error) {
if ok, err := IsServingInstalled(c); ok {
return ok, err
} else if ok, err = IsEventingInstalled(c); ok {
return ok, err
} else if err != nil {
return false, err
}

return false, nil
}

// IsServingInstalled returns true if we are connected to a cluster with Knative Serving installed.
func IsServingInstalled(c kubernetes.Interface) (bool, error) {
return IsRefKindInstalled(c, corev1.ObjectReference{
Expand All @@ -52,7 +65,7 @@ func IsEventingInstalled(c kubernetes.Interface) (bool, error) {
})
}

func isInstalled(c kubernetes.Interface, api schema.GroupVersion) (bool, error) {
func isServerResourceAvailable(c kubernetes.Interface, api schema.GroupVersion) (bool, error) {
_, err := c.Discovery().ServerResourcesForGroupVersion(api.String())
if err != nil && (k8serrors.IsNotFound(err) || util.IsUnknownAPIError(err)) {
return false, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func CreateSubscription(channelReference corev1.ObjectReference, serviceName str
}
}

func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) *eventing.Trigger {
func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, serviceAPIVersion string, eventType string, path string) (*eventing.Trigger, error) {
nameSuffix := ""
var attributes map[string]string
if eventType != "" {
Expand All @@ -100,7 +100,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e
Broker: brokerReference.Name,
Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: serving.SchemeGroupVersion.String(),
APIVersion: serviceAPIVersion,
Kind: "Service",
Name: serviceName,
},
Expand All @@ -109,7 +109,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e
},
},
},
}
}, nil
}

func CreateSinkBinding(source corev1.ObjectReference, target corev1.ObjectReference) *sources.SinkBinding {
Expand Down

0 comments on commit 527f356

Please sign in to comment.