From 3e9886bcf0d03632c12126f885de9bed1c5853e3 Mon Sep 17 00:00:00 2001 From: nicolaferraro Date: Fri, 23 Oct 2020 14:19:59 +0200 Subject: [PATCH] Fix #1778: allow pushing to broker via KameletBinding --- .../kamelet-binding-broker/kamelet.feature | 5 ++ .../logger-sink-binding.yaml | 34 ++++++++++++ .../logger-sink.kamelet.yaml | 43 +++++++++++++++ .../timer-source-binding.yaml | 37 +++++++++++++ .../timer-source.kamelet.yaml | 54 +++++++++++++++++++ .../kamelet-binding-broker/yaks-config.yaml | 36 +++++++++++++ pkg/util/bindings/knative_ref.go | 12 ++++- pkg/util/knative/apis_test.go | 4 +- pkg/util/knative/knative.go | 2 +- pkg/util/knative/uri.go | 11 ++-- pkg/util/knative/uri_test.go | 10 +++- 11 files changed, 237 insertions(+), 11 deletions(-) create mode 100644 e2e/yaks/common/kamelet-binding-broker/kamelet.feature create mode 100644 e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml create mode 100644 e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml create mode 100644 e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml create mode 100644 e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml create mode 100644 e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml diff --git a/e2e/yaks/common/kamelet-binding-broker/kamelet.feature b/e2e/yaks/common/kamelet-binding-broker/kamelet.feature new file mode 100644 index 0000000000..4fb1de32f4 --- /dev/null +++ b/e2e/yaks/common/kamelet-binding-broker/kamelet.feature @@ -0,0 +1,5 @@ +Feature: Camel K can bind Kamelets to the broker + + Scenario: Sending event to the broker with KameletBinding + Given integration logger-sink-binding is running + Then integration logger-sink-binding should print message: Hello Custom Event diff --git a/e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml b/e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml new file mode 100644 index 0000000000..6b0c0a04c0 --- /dev/null +++ b/e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml @@ -0,0 +1,34 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: logger-sink-binding +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1beta1 + name: default + properties: + type: custom-type + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: logger-sink diff --git a/e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml b/e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml new file mode 100644 index 0000000000..edc710f470 --- /dev/null +++ b/e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml @@ -0,0 +1,43 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: logger-sink + label: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "Logger" + description: "Logs the received payload of each incoming event" + properties: + prefix: + title: Prefix + description: The prefix to prepend to the logged message + type: string + default: "message: " + types: + in: + mediaType: text/plain + out: + mediaType: text/plain + flow: + from: + uri: "kamelet:source" + steps: + - log: "{{prefix}}${body}" diff --git a/e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml b/e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml new file mode 100644 index 0000000000..317ab02e17 --- /dev/null +++ b/e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml @@ -0,0 +1,37 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: timer-source-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: timer-source + properties: + message: Hello Custom Event + period: 1000 + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1beta1 + name: default + properties: + type: custom-type diff --git a/e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml b/e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml new file mode 100644 index 0000000000..dc64b57236 --- /dev/null +++ b/e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml @@ -0,0 +1,54 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: timer-source + label: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Timer" + description: "Produces periodic events with a custom payload" + required: + - message + properties: + period: + title: Period + description: The time interval between two events + type: integer + default: 1000 + message: + title: Message + description: The message to generate + type: string + types: + out: + mediaType: application/json + schema: + id: text.camel.apache.org + type: string + flow: + from: + uri: timer:tick + parameters: + period: "{{period}}" + steps: + - set-body: + constant: "{{message}}" + - to: "kamelet:sink" diff --git a/e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml b/e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml new file mode 100644 index 0000000000..e8414f85c5 --- /dev/null +++ b/e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml @@ -0,0 +1,36 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + namespace: + temporary: true +pre: +- name: installation + run: | + # One of the two labels should work + oc label namespace $YAKS_NAMESPACE eventing.knative.dev/injection=enabled + oc label namespace $YAKS_NAMESPACE knative-eventing-injection=enabled + + kamel install -n $YAKS_NAMESPACE + + kubectl apply -f timer-source.kamelet.yaml -n $YAKS_NAMESPACE + kubectl apply -f logger-sink.kamelet.yaml -n $YAKS_NAMESPACE + + kubectl apply -f timer-source-binding.yaml -n $YAKS_NAMESPACE + kubectl apply -f logger-sink-binding.yaml -n $YAKS_NAMESPACE + kubectl wait kameletbinding timer-source-binding --for=condition=Ready --timeout=10m -n $YAKS_NAMESPACE + kubectl wait kameletbinding logger-sink-binding --for=condition=Ready --timeout=10m -n $YAKS_NAMESPACE diff --git a/pkg/util/bindings/knative_ref.go b/pkg/util/bindings/knative_ref.go index e3ad420c25..07e9454780 100644 --- a/pkg/util/bindings/knative_ref.go +++ b/pkg/util/bindings/knative_ref.go @@ -19,6 +19,7 @@ package bindings import ( "encoding/json" + "errors" "fmt" "net/url" @@ -69,12 +70,21 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointType v1 var serviceURI string if *serviceType == knativeapis.CamelServiceTypeEvent { + // TODO enable this when the runtime will support changing the broker name (https://github.com/apache/camel-k-runtime/issues/535) + //if props["name"] == "" { + // props["name"] = e.Ref.Name + //} if eventType, ok := props["type"]; ok { // consume prop delete(props, "type") serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, eventType) } else { - serviceURI = fmt.Sprintf("knative:%s", *serviceType) + if endpointType == v1alpha1.EndpointTypeSink { + // Allowing no event type, but it can fail. See https://github.com/apache/camel-k-runtime/issues/536 + serviceURI = fmt.Sprintf("knative:%s", *serviceType) + } else { + return nil, errors.New(`property "type" must be provided when reading from the Broker`) + } } } else { serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, url.PathEscape(e.Ref.Name)) diff --git a/pkg/util/knative/apis_test.go b/pkg/util/knative/apis_test.go index 3f1730f195..fd1331dee6 100644 --- a/pkg/util/knative/apis_test.go +++ b/pkg/util/knative/apis_test.go @@ -134,7 +134,7 @@ func TestAPIs(t *testing.T) { Name: "default", }, refs[0]) - ref, err = ExtractObjectReference("knative:event/ciao?brokerApiVersion=xxx") + ref, err = ExtractObjectReference("knative:event/ciao?apiVersion=xxx") assert.Nil(t, err) refs = FillMissingReferenceData(knative.CamelServiceTypeEvent, ref) checkValidRefs(t, refs) @@ -144,7 +144,7 @@ func TestAPIs(t *testing.T) { Name: "default", }, refs[0]) - ref, err = ExtractObjectReference("knative:event/ciao?brokerName=aaa") + ref, err = ExtractObjectReference("knative:event/ciao?name=aaa") assert.Nil(t, err) refs = FillMissingReferenceData(knative.CamelServiceTypeEvent, ref) checkValidRefs(t, refs) diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go index 2a45fb1fe5..4f506ea3ae 100644 --- a/pkg/util/knative/knative.go +++ b/pkg/util/knative/knative.go @@ -80,7 +80,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e }, ObjectMeta: metav1.ObjectMeta{ Namespace: brokerReference.Namespace, - Name: brokerReference.Name + "-" + serviceName + "-" + eventType, + Name: brokerReference.Name + "-" + serviceName + "-" + kubernetesutils.SanitizeLabel(eventType), }, Spec: eventing.TriggerSpec{ Filter: &eventing.TriggerFilter{ diff --git a/pkg/util/knative/uri.go b/pkg/util/knative/uri.go index 3df11d6e05..5dd6e69583 100644 --- a/pkg/util/knative/uri.go +++ b/pkg/util/knative/uri.go @@ -26,14 +26,13 @@ import ( v1 "k8s.io/api/core/v1" ) -var uriRegexp = regexp.MustCompile(`^knative:[/]*(channel|endpoint|event)(?:$|/([A-Za-z0-9.-]+)(?:[/?].*|$))`) +var uriRegexp = regexp.MustCompile(`^knative:[/]*(channel|endpoint|event)(?:[?].*|$|/([A-Za-z0-9.-]+)(?:[/?].*|$))`) var plainNameRegexp = regexp.MustCompile(`^[A-Za-z0-9.-]+$`) const ( - paramAPIVersion = "apiVersion" - paramKind = "kind" - paramBrokerName = "brokerName" - paramBrokerAPIVersion = "brokerApiVersion" + paramAPIVersion = "apiVersion" + paramKind = "kind" + paramBrokerName = "name" ) // FilterURIs returns all Knative URIs of the given type from a slice @@ -62,7 +61,7 @@ func ExtractObjectReference(uri string) (v1.ObjectReference, error) { if name == "" { name = "default" } - apiVersion := uriutils.GetQueryParameter(uri, paramBrokerAPIVersion) + apiVersion := uriutils.GetQueryParameter(uri, paramAPIVersion) return v1.ObjectReference{ Name: name, APIVersion: apiVersion, diff --git a/pkg/util/knative/uri_test.go b/pkg/util/knative/uri_test.go index ece603f880..995127889b 100644 --- a/pkg/util/knative/uri_test.go +++ b/pkg/util/knative/uri_test.go @@ -74,7 +74,7 @@ func TestChannelUri(t *testing.T) { Name: "ciao", }, ref) - ref, err = ExtractObjectReference("knative://event/chuck?&brokerApiVersion=eventing.knative.dev/v1beta1&brokerName=broker2") + ref, err = ExtractObjectReference("knative://event/chuck?&apiVersion=eventing.knative.dev/v1beta1&name=broker2") assert.Nil(t, err) assert.Equal(t, v1.ObjectReference{ APIVersion: "eventing.knative.dev/v1beta1", @@ -88,6 +88,14 @@ func TestChannelUri(t *testing.T) { Name: "default", Kind: "Broker", }, ref) + + ref, err = ExtractObjectReference("knative://event?&apiVersion=eventing.knative.dev/v1beta13&brokxerName=broker2") + assert.Nil(t, err) + assert.Equal(t, v1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1beta13", + Name: "default", + Kind: "Broker", + }, ref) } func TestNormalizeToUri(t *testing.T) {