Skip to content

Commit

Permalink
Experimental KEDA support in Kafka Event Source
Browse files Browse the repository at this point in the history
Warning: this is *experimental* and may be changed in future. Should
not be used in production. This is mainly for discussion and evolving
scaling in Knative eventing.

The code is using Unstructured and also imported KEDA API - this is for
discussion which version should be used (right now only Unstructured is
fully implemented).
KEDA to provide client-go support discussion knative#494
kedacore/keda#494
  • Loading branch information
aslom committed Feb 3, 2020
1 parent d3f837c commit d1250f0
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 0 deletions.
65 changes: 65 additions & 0 deletions kafka/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,68 @@ event sink.
A more detailed example of the `KafkaSource` can be found in the
[Knative documentation](https://knative.dev/docs/eventing/samples/).


# Experimental KEDA support in Kafka Event Source


Warning: this is *experimental* and may be changed in future. Should not be used in production. This is mainly for discussion and evolving scaling in Knative eventing.

The code is using Unstructured and also imported KEDA API - this is for discussion which version should be used (right now only Unstructured is fully implemented).
KEDA to provide client-go support discussion #494 https://github.com/kedacore/keda/issues/494

## Install Kafka

See above

## Install KEDA -

To install the version I used for the experiment:

```
git clone https://github.com/kedacore/keda.git
cd keda
git pull
git checkout master
#$ git rev-parse HEAD
#0099c102d538995c81610fb48d12bde1259678a6
git checkout 0099c102d538995c81610fb48d12bde1259678a6
```

## Run Kafka Source Controller

Install Kafka source controller with Keda support:

```
export KO_DOCKER_REPO=...
ko apply -f kafka/source/config/
```


#### Local testing

```
go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG
```
## Create Kafka Source that uses YAML
And use example YAML that has minReplicaCount set to value different from 1. For example:
```
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: kafka-src10
spec:
bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the .kafka in URL for namespace
consumerGroup: kafka-source10-11
minReplicaCount: 0
maxReplicaCount: 2
topics: my-topic-10
sink:
apiVersion: v1
kind: Service
name: hello-display
```
6 changes: 6 additions & 0 deletions kafka/source/config/300-kafkasource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ spec:
consumerGroup:
type: string
minLength: 1
minReplicaCount:
format: int32
type: integer
maxReplicaCount:
format: int32
type: integer
net:
properties:
sasl:
Expand Down
8 changes: 8 additions & 0 deletions kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ type KafkaSourceSpec struct {
// +required
ConsumerGroup string `json:"consumerGroup"`

// MinReplicaCount is
// +optional
MinReplicaCount *int32 `json:"minReplicaCount"`

// MaxReplicaCount is
// +optional
MaxReplicaCount *int32 `json:"maxReplicaCount"`

Net KafkaSourceNetSpec `json:"net,omitempty"`

// Sink is a reference to an object that will resolve to a domain name to use as the sink.
Expand Down
10 changes: 10 additions & 0 deletions kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

137 changes: 137 additions & 0 deletions kafka/source/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
Expand All @@ -57,6 +59,8 @@ import (
pkgLogging "knative.dev/pkg/logging"
)

//KEDA API keda_v1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"

const (
raImageEnvVar = "KAFKA_RA_IMAGE"
kafkaReadinessChanged = "KafkaSourceReadinessChanged"
Expand Down Expand Up @@ -229,6 +233,136 @@ func checkResourcesStatus(src *v1alpha1.KafkaSource) error {
return nil
}

// //Uses KEDA API
// func (r *Reconciler) generateKedaScaledObjectWithKedaAPI(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) {
// println("Got ra", ra.GetName())
// println("Got minReplicaCount", src.Spec.MinReplicaCount)
// if src.Spec.MinReplicaCount == nil && *src.Spec.MinReplicaCount == 1 { // default - TODO check if not set?
// return
// }
// scaledObject := &keda_v1alpha1.ScaledObject{
// Spec: keda_v1alpha1.ScaledObjectSpec{
// MinReplicaCount: src.Spec.MinReplicaCount,
// MaxReplicaCount: src.Spec.MaxReplicaCount,
// },
// }
// r.deployScaledObjectWithKedaJSON(scaledObject)
// }

// //Uses KEDA API
// func (r *Reconciler) deployScaledObjectWithKedaJSON(so *keda_v1alpha1.ScaledObject) {
// json, err := json.Marshal(so)
// if err != nil {
// log.Fatal(err)
// }
// println("TODO KEDA deploy", string(json))
// r.KubeClientSet.AppsV1()
// }

func (r *Reconciler) deployKedaScaledObject(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) {
deploymentName := ra.GetName()
println("Got ra", deploymentName)
println("Got minReplicaCount", src.Spec.MinReplicaCount)
if src.Spec.MinReplicaCount == nil && *src.Spec.MinReplicaCount == 1 { // default - TODO check if not set?
//TODO: delete ScaledObject if exist
return nil, fmt.Errorf("as minReplicaCount is 1 so skipping creaion of ScaledObject")
}
namespace := src.Namespace //"default"
name := src.Name
gvk := schema.GroupVersionKind{
Group: "keda.k8s.io",
Version: "v1alpha1",
Kind: "ScaledObject",
}
gvr, _ := meta.UnsafeGuessKindToResource(gvk)
scaledObjectResourceInterface := r.DynamicClientSet.Resource(gvr).Namespace(namespace)
if scaledObjectResourceInterface == nil {
return nil, fmt.Errorf("unable to create dynamic client for ScaledObject")
}
scaledObjectUnstr := r.generateKedaScaledObjectUnstructured(ctx, ra, src)
created, err := scaledObjectResourceInterface.Create(scaledObjectUnstr, metav1.CreateOptions{})
if err != nil {
//println("Failed to create ScaledObject", err)
fmt.Printf("Doing update as failed to create ScaledObject: %s \n", err)
// will replace - https://github.com/kubernetes/client-go/blob/master/examples/dynamic-create-update-delete-deployment/main.go
// doing kubectl "replace" https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
// first get resourceVersion
existing, err := scaledObjectResourceInterface.Get(name, metav1.GetOptions{})
if err != nil {
fmt.Printf("Get failed to create ScaledObject: %s \n", err)
return nil, err
}
resourceVersion := existing.GetResourceVersion()
scaledObjectUnstr.SetResourceVersion(resourceVersion)
updated, err := scaledObjectResourceInterface.Update(scaledObjectUnstr, metav1.UpdateOptions{})
if err != nil {
fmt.Printf("Update failed to create ScaledObject: %s \n", err)
return nil, err
} else {
fmt.Printf("Update success %v \n", updated)
return updated, nil
}
}
return created, nil
}

func (r *Reconciler) generateKedaScaledObjectUnstructured(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) *unstructured.Unstructured {
deploymentName := ra.GetName()
minReplicaCount := src.Spec.MinReplicaCount
maxReplicaCount := src.Spec.MaxReplicaCount
println("Got minReplicaCount", minReplicaCount)
println("Got maxReplicaCount", maxReplicaCount)
var actualMinReplicaCount int32 = 0
if minReplicaCount != nil {
actualMinReplicaCount = *minReplicaCount
}
var actualMaxReplicaCount int32 = 1
if maxReplicaCount != nil {
actualMaxReplicaCount = *maxReplicaCount
}
println("Got actualMinReplicaCount", actualMinReplicaCount)
println("Got actualMaxReplicaCount", actualMaxReplicaCount)
namespace := src.Namespace //"default"
name := src.Name
println("Unstructured SO name", name)
soUnstr := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "keda.k8s.io/v1alpha1",
"kind": "ScaledObject",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"namespace": namespace,
"name": name,
"labels": map[string]interface{}{
"deploymentName": deploymentName,
},
},
"spec": map[string]interface{}{
"scaleTargetRef": map[string]interface{}{
"deploymentName": deploymentName,
},
"minReplicaCount": actualMinReplicaCount,
"maxReplicaCount": actualMaxReplicaCount,
"triggers": []map[string]interface{}{{
"type": "kafka",
"metadata": map[string]interface{}{
"brokerList": "my-cluster-kafka-brokers.kafka:9092",
"consumerGroup": "kafka-source10-1",
"topic": "my-topic-10",
"lagThreshold": "50",
},
}},
},
},
}
fmt.Printf("Unstructured SO %s \n", soUnstr)
return soUnstr
}

/*
*/

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI string) (*v1.Deployment, error) {

if err := checkResourcesStatus(src); err != nil {
Expand Down Expand Up @@ -262,6 +396,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf
msg = fmt.Sprintf("Deployment created, error: %v", err)
}
r.Recorder.Eventf(src, corev1.EventTypeNormal, kafkaSourceDeploymentCreated, "%s", msg)
r.deployKedaScaledObject(ctx, ra, src)
return ra, err
} else if err != nil {
logging.FromContext(ctx).Error("Unable to get an existing receive adapter", zap.Error(err))
Expand All @@ -274,9 +409,11 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf
return ra, err
}
r.Recorder.Eventf(src, corev1.EventTypeNormal, kafkaSourceDeploymentUpdated, "Deployment updated")
r.deployKedaScaledObject(ctx, ra, src)
return ra, nil
} else {
logging.FromContext(ctx).Debug("Reusing existing receive adapter", zap.Any("receiveAdapter", ra))
r.deployKedaScaledObject(ctx, ra, src)
}
return ra, nil
}
Expand Down

0 comments on commit d1250f0

Please sign in to comment.