Skip to content

Commit

Permalink
Fix #1778: allow pushing to broker via KameletBinding
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Oct 26, 2020
1 parent 7801299 commit 3e9886b
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 11 deletions.
5 changes: 5 additions & 0 deletions e2e/yaks/common/kamelet-binding-broker/kamelet.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Feature: Camel K can bind Kamelets to the broker

Scenario: Sending event to the broker with KameletBinding
Given integration logger-sink-binding is running
Then integration logger-sink-binding should print message: Hello Custom Event
34 changes: 34 additions & 0 deletions e2e/yaks/common/kamelet-binding-broker/logger-sink-binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: logger-sink-binding
spec:
source:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1beta1
name: default
properties:
type: custom-type
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: logger-sink
43 changes: 43 additions & 0 deletions e2e/yaks/common/kamelet-binding-broker/logger-sink.kamelet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: logger-sink
label:
camel.apache.org/kamelet.type: "sink"
spec:
definition:
title: "Logger"
description: "Logs the received payload of each incoming event"
properties:
prefix:
title: Prefix
description: The prefix to prepend to the logged message
type: string
default: "message: "
types:
in:
mediaType: text/plain
out:
mediaType: text/plain
flow:
from:
uri: "kamelet:source"
steps:
- log: "{{prefix}}${body}"
37 changes: 37 additions & 0 deletions e2e/yaks/common/kamelet-binding-broker/timer-source-binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: timer-source-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: timer-source
properties:
message: Hello Custom Event
period: 1000
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1beta1
name: default
properties:
type: custom-type
54 changes: 54 additions & 0 deletions e2e/yaks/common/kamelet-binding-broker/timer-source.kamelet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: timer-source
label:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Timer"
description: "Produces periodic events with a custom payload"
required:
- message
properties:
period:
title: Period
description: The time interval between two events
type: integer
default: 1000
message:
title: Message
description: The message to generate
type: string
types:
out:
mediaType: application/json
schema:
id: text.camel.apache.org
type: string
flow:
from:
uri: timer:tick
parameters:
period: "{{period}}"
steps:
- set-body:
constant: "{{message}}"
- to: "kamelet:sink"
36 changes: 36 additions & 0 deletions e2e/yaks/common/kamelet-binding-broker/yaks-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

config:
namespace:
temporary: true
pre:
- name: installation
run: |
# One of the two labels should work
oc label namespace $YAKS_NAMESPACE eventing.knative.dev/injection=enabled
oc label namespace $YAKS_NAMESPACE knative-eventing-injection=enabled
kamel install -n $YAKS_NAMESPACE
kubectl apply -f timer-source.kamelet.yaml -n $YAKS_NAMESPACE
kubectl apply -f logger-sink.kamelet.yaml -n $YAKS_NAMESPACE
kubectl apply -f timer-source-binding.yaml -n $YAKS_NAMESPACE
kubectl apply -f logger-sink-binding.yaml -n $YAKS_NAMESPACE
kubectl wait kameletbinding timer-source-binding --for=condition=Ready --timeout=10m -n $YAKS_NAMESPACE
kubectl wait kameletbinding logger-sink-binding --for=condition=Ready --timeout=10m -n $YAKS_NAMESPACE
12 changes: 11 additions & 1 deletion pkg/util/bindings/knative_ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package bindings

import (
"encoding/json"
"errors"
"fmt"
"net/url"

Expand Down Expand Up @@ -69,12 +70,21 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointType v1

var serviceURI string
if *serviceType == knativeapis.CamelServiceTypeEvent {
// TODO enable this when the runtime will support changing the broker name (https://github.com/apache/camel-k-runtime/issues/535)
//if props["name"] == "" {
// props["name"] = e.Ref.Name
//}
if eventType, ok := props["type"]; ok {
// consume prop
delete(props, "type")
serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, eventType)
} else {
serviceURI = fmt.Sprintf("knative:%s", *serviceType)
if endpointType == v1alpha1.EndpointTypeSink {
// Allowing no event type, but it can fail. See https://github.com/apache/camel-k-runtime/issues/536
serviceURI = fmt.Sprintf("knative:%s", *serviceType)
} else {
return nil, errors.New(`property "type" must be provided when reading from the Broker`)
}
}
} else {
serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, url.PathEscape(e.Ref.Name))
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/knative/apis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestAPIs(t *testing.T) {
Name: "default",
}, refs[0])

ref, err = ExtractObjectReference("knative:event/ciao?brokerApiVersion=xxx")
ref, err = ExtractObjectReference("knative:event/ciao?apiVersion=xxx")
assert.Nil(t, err)
refs = FillMissingReferenceData(knative.CamelServiceTypeEvent, ref)
checkValidRefs(t, refs)
Expand All @@ -144,7 +144,7 @@ func TestAPIs(t *testing.T) {
Name: "default",
}, refs[0])

ref, err = ExtractObjectReference("knative:event/ciao?brokerName=aaa")
ref, err = ExtractObjectReference("knative:event/ciao?name=aaa")
assert.Nil(t, err)
refs = FillMissingReferenceData(knative.CamelServiceTypeEvent, ref)
checkValidRefs(t, refs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/knative/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func CreateTrigger(brokerReference corev1.ObjectReference, serviceName string, e
},
ObjectMeta: metav1.ObjectMeta{
Namespace: brokerReference.Namespace,
Name: brokerReference.Name + "-" + serviceName + "-" + eventType,
Name: brokerReference.Name + "-" + serviceName + "-" + kubernetesutils.SanitizeLabel(eventType),
},
Spec: eventing.TriggerSpec{
Filter: &eventing.TriggerFilter{
Expand Down
11 changes: 5 additions & 6 deletions pkg/util/knative/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
v1 "k8s.io/api/core/v1"
)

var uriRegexp = regexp.MustCompile(`^knative:[/]*(channel|endpoint|event)(?:$|/([A-Za-z0-9.-]+)(?:[/?].*|$))`)
var uriRegexp = regexp.MustCompile(`^knative:[/]*(channel|endpoint|event)(?:[?].*|$|/([A-Za-z0-9.-]+)(?:[/?].*|$))`)
var plainNameRegexp = regexp.MustCompile(`^[A-Za-z0-9.-]+$`)

const (
paramAPIVersion = "apiVersion"
paramKind = "kind"
paramBrokerName = "brokerName"
paramBrokerAPIVersion = "brokerApiVersion"
paramAPIVersion = "apiVersion"
paramKind = "kind"
paramBrokerName = "name"
)

// FilterURIs returns all Knative URIs of the given type from a slice
Expand Down Expand Up @@ -62,7 +61,7 @@ func ExtractObjectReference(uri string) (v1.ObjectReference, error) {
if name == "" {
name = "default"
}
apiVersion := uriutils.GetQueryParameter(uri, paramBrokerAPIVersion)
apiVersion := uriutils.GetQueryParameter(uri, paramAPIVersion)
return v1.ObjectReference{
Name: name,
APIVersion: apiVersion,
Expand Down
10 changes: 9 additions & 1 deletion pkg/util/knative/uri_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestChannelUri(t *testing.T) {
Name: "ciao",
}, ref)

ref, err = ExtractObjectReference("knative://event/chuck?&brokerApiVersion=eventing.knative.dev/v1beta1&brokerName=broker2")
ref, err = ExtractObjectReference("knative://event/chuck?&apiVersion=eventing.knative.dev/v1beta1&name=broker2")
assert.Nil(t, err)
assert.Equal(t, v1.ObjectReference{
APIVersion: "eventing.knative.dev/v1beta1",
Expand All @@ -88,6 +88,14 @@ func TestChannelUri(t *testing.T) {
Name: "default",
Kind: "Broker",
}, ref)

ref, err = ExtractObjectReference("knative://event?&apiVersion=eventing.knative.dev/v1beta13&brokxerName=broker2")
assert.Nil(t, err)
assert.Equal(t, v1.ObjectReference{
APIVersion: "eventing.knative.dev/v1beta13",
Name: "default",
Kind: "Broker",
}, ref)
}

func TestNormalizeToUri(t *testing.T) {
Expand Down

0 comments on commit 3e9886b

Please sign in to comment.