Skip to content

Commit

Permalink
fix(kamelets): register v1alpha1 binding providers
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Apr 11, 2023
1 parent 982f4c9 commit b2e2c72
Show file tree
Hide file tree
Showing 14 changed files with 9,005 additions and 176 deletions.
1 change: 1 addition & 0 deletions addons/register_strimzi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ import (

func init() {
bindings.RegisterBindingProvider(strimzi.BindingProvider{})
bindings.V1alpha1RegisterBindingProvider(strimzi.V1alpha1BindingProvider{})
}
103 changes: 103 additions & 0 deletions addons/strimzi/strimzi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/apache/camel-k/v2/addons/strimzi/duck/v1beta2"

camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
camelv1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/v2/pkg/util/bindings"
"github.com/apache/camel-k/v2/pkg/util/uri"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -119,6 +120,108 @@ func (s BindingProvider) getBootstrapServers(ctx bindings.BindingContext, cluste
return "", fmt.Errorf("cluster %q has no listeners of type %q", clusterName, v1beta2.StrimziListenerTypePlain)
}

// Order --.
func (s BindingProvider) Order() int {
return bindings.OrderStandard
}

// V1alpha1BindingProvider allows to connect to a Kafka topic via Binding.
// Deprecated.
type V1alpha1BindingProvider struct {
Client internalclientset.Interface
}

// ID --.
// Deprecated.
func (s V1alpha1BindingProvider) ID() string {
return "strimzi"
}

// Translate --.
// Deprecated.
func (s V1alpha1BindingProvider) Translate(ctx bindings.V1alpha1BindingContext, _ bindings.V1alpha1EndpointContext, endpoint camelv1alpha1.Endpoint) (*bindings.Binding, error) {
if endpoint.Ref == nil {
// React only on refs
return nil, nil
}
gv, err := schema.ParseGroupVersion(endpoint.Ref.APIVersion)
if err != nil {
return nil, err
}

if gv.Group != v1beta2.StrimziGroup || endpoint.Ref.Kind != v1beta2.StrimziKindTopic {
// Only operates on Strimzi Topics
return nil, nil
}

props, err := endpoint.Properties.GetPropertyMap()
if err != nil {
return nil, err
}
if props == nil {
props = make(map[string]string)
}

if props["brokers"] == "" {
// build the client if needed
if s.Client == nil {
kafkaClient, err := internalclientset.NewForConfig(ctx.Client.GetConfig())
if err != nil {
return nil, err
}
s.Client = kafkaClient
}

// look them up
topic, err := s.Client.KafkaV1beta2().KafkaTopics(ctx.Namespace).Get(ctx.Ctx, endpoint.Ref.Name, v1.GetOptions{})
if err != nil {
return nil, err
}

clusterName := topic.Labels[v1beta2.StrimziKafkaClusterLabel]
if clusterName == "" {
return nil, fmt.Errorf("no %q label defined on topic %s", v1beta2.StrimziKafkaClusterLabel, endpoint.Ref.Name)
}

bootstrapServers, err := s.getBootstrapServers(ctx, clusterName)
if err != nil {
return nil, err
}

props["brokers"] = bootstrapServers
}

kafkaURI := fmt.Sprintf("kafka:%s", endpoint.Ref.Name)
kafkaURI = uri.AppendParameters(kafkaURI, props)

return &bindings.Binding{
URI: kafkaURI,
}, nil
}

// getBootstrapServers --.
// Deprecated.
func (s V1alpha1BindingProvider) getBootstrapServers(ctx bindings.V1alpha1BindingContext, clusterName string) (string, error) {
cluster, err := s.Client.KafkaV1beta2().Kafkas(ctx.Namespace).Get(ctx.Ctx, clusterName, v1.GetOptions{})
if err != nil {
return "", err
}

for _, l := range cluster.Status.Listeners {
if l.Type == v1beta2.StrimziListenerTypePlain {
if l.BootstrapServers == "" {
return "", fmt.Errorf("cluster %q has no bootstrap servers in %q listener", clusterName, v1beta2.StrimziListenerTypePlain)
}

return l.BootstrapServers, nil
}
}

return "", fmt.Errorf("cluster %q has no listeners of type %q", clusterName, v1beta2.StrimziListenerTypePlain)
}

// Order --.
// Deprecated.
func (s V1alpha1BindingProvider) Order() int {
return bindings.OrderStandard
}
25 changes: 18 additions & 7 deletions config/crd/bases/camel.apache.org_pipes.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.6.1
creationTimestamp: null
labels:
app: camel-k
name: pipes.camel.apache.org
spec:
group: camel.apache.org
Expand Down Expand Up @@ -8512,9 +8529,3 @@ spec:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.replicas
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# ---------------------------------------------------------------------------

kind: KameletBinding
apiVersion: camel.apache.org/v1
apiVersion: camel.apache.org/v1alpha1
metadata:
name: properties-binding
spec:
Expand Down
2 changes: 1 addition & 1 deletion e2e/yaks/common/knative-sinkbinding/source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
period: "1000"
steps:
- set-body:
constant: "Hello SinkKameletBinding !!!"
constant: "Hello SinkBinding !!!"
- transform:
simple: "${body.toUpperCase()}"
- to: "log:info"
Expand Down
Loading

0 comments on commit b2e2c72

Please sign in to comment.