From 53867ec3ad8bf745edbe7728ada4dfb998787b49 Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 10:07:19 -0400 Subject: [PATCH 01/10] updated README --- kafka/source/README.md | 83 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/kafka/source/README.md b/kafka/source/README.md index d338cfac36..75ac5ce81d 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -51,3 +51,86 @@ event sink. A more detailed example of the `KafkaSource` can be found in the [Knative documentation](https://knative.dev/docs/eventing/samples/). + +## Experimental KEDA support in Kafka Event Source + +Warning: this is *experimental* and may be changed in future. Should not be used in production. This is mainly for discussion and evolving scaling in Knative eventing. + +The code is using Unstructured and also imported KEDA API - this is for discussion which version should be used (right now only Unstructured is fully implemented). +KEDA to provide client-go support discussion #494 + +### Install KEDA + +To install the version I used for the experiment (the latest version from master should work too): + +```bash +git clone https://github.com/kedacore/keda.git +cd keda +git checkout v1.3.0 +``` + +Then follow [KEDA setup instructions](https://keda.sh/deploy/) to deploy using YAML: + +```bash +kubectl apply -f deploy/crds/keda.k8s.io_scaledobjects_crd.yaml +kubectl apply -f deploy/crds/keda.k8s.io_triggerauthentications_crd.yaml +kubectl apply -f deploy/ +``` + +If worked expected to have in keda namespace: + +```bash +kubectl get pods -n keda +NAME READY STATUS RESTARTS AGE +keda-metrics-apiserver-5bb8b6664c-vrhrj 1/1 Running 0 38s +keda-operator-865ccfb9d9-xd85w 1/1 Running 0 39s +``` + +### Run Kafka Source Controller + +Install expermiental Kafka source controller with KEDA support from source code: + +```bash +export KO_DOCKER_REPO=... +ko apply -f kafka/source/config/ +``` + +#### Local testing + +```bash +go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG +``` + +### Create Kafka Source that uses KEDA + +To use KEDA the YAML file must have one of autoscaling annotations (for example minScale): + +```yaml +apiVersion: sources.knative.dev/v1alpha1 +kind: KafkaSource +metadata: + name: knative-demo-kafka-keda-src1 + annotations: + autoscaling.knative.dev/minScale: "0" + autoscaling.knative.dev/maxScale: "10" + autoscaling.knative.dev/class: keda.autoscaling.knative.dev + keda.autoscaling.knative.dev/pollingInterval: "2" + keda.autoscaling.knative.dev/cooldownPeriod: "15" +spec: + bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the .kafka in URL for namespace + consumerGroup: knative-demo-kafka-keda-src1 + topics: knative-demo-topic + sink: + ref: + apiVersion: serving.knative.dev/v1alpha1 + kind: Service + name: event-display +``` + +To verify that Kafka source is using KEDA retrieve the scaled object created by Kafka source: + +```bash +kubectl get scaledobjects.keda.k8s.io +NAME DEPLOYMENT TRIGGERS AGE +knative-demo-kafka-keda-src1 kafkasource-knative-demo-k-999036a8-af6e-4431-b671-d052842dddf1 kafka 31s +``` From f11a6eb254b6c79552b61dc72d5e7006c780697e Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 10:10:54 -0400 Subject: [PATCH 02/10] separare scaling code in its own package --- kafka/source/cmd/controller/main.go | 1 + kafka/source/config/201-clusterrole.yaml | 6 ++++++ kafka/source/pkg/reconciler/controller.go | 2 ++ kafka/source/pkg/reconciler/kafkasource.go | 6 +++++- 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/kafka/source/cmd/controller/main.go b/kafka/source/cmd/controller/main.go index a115f67f49..158a2dc835 100644 --- a/kafka/source/cmd/controller/main.go +++ b/kafka/source/cmd/controller/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" kafka "knative.dev/eventing-contrib/kafka/source/pkg/reconciler" "knative.dev/pkg/injection/sharedmain" ) diff --git a/kafka/source/config/201-clusterrole.yaml b/kafka/source/config/201-clusterrole.yaml index 5c610a0851..79a4c23f92 100644 --- a/kafka/source/config/201-clusterrole.yaml +++ b/kafka/source/config/201-clusterrole.yaml @@ -67,6 +67,12 @@ rules: - leases verbs: *everything +- apiGroups: + - keda.k8s.io + resources: + - scaledobjects + verbs: *everything + --- # The role is needed for the aggregated role source-observer in knative-eventing to provide readonly access to "Sources". # See https://github.com/knative/eventing/blob/master/config/200-source-observer-clusterrole.yaml. diff --git a/kafka/source/pkg/reconciler/controller.go b/kafka/source/pkg/reconciler/controller.go index 43d6096133..8d83306ee8 100644 --- a/kafka/source/pkg/reconciler/controller.go +++ b/kafka/source/pkg/reconciler/controller.go @@ -27,6 +27,7 @@ import ( "knative.dev/eventing/pkg/apis/sources/v1alpha1" kubeclient "knative.dev/pkg/client/injection/kube/client" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" + "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -53,6 +54,7 @@ func NewController( c := &Reconciler{ KubeClientSet: kubeclient.Get(ctx), + DynamicClientSet: dynamicclient.Get(ctx), kafkaClientSet: kafkaclient.Get(ctx), kafkaLister: kafkaInformer.Lister(), deploymentLister: deploymentInformer.Lister(), diff --git a/kafka/source/pkg/reconciler/kafkasource.go b/kafka/source/pkg/reconciler/kafkasource.go index 5b94499d5c..33883a8723 100644 --- a/kafka/source/pkg/reconciler/kafkasource.go +++ b/kafka/source/pkg/reconciler/kafkasource.go @@ -37,8 +37,10 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" "knative.dev/eventing-contrib/kafka/source/pkg/reconciler/resources" + "knative.dev/eventing-contrib/kafka/source/pkg/reconciler/scaling" // NewController stuff + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" appsv1listers "k8s.io/client-go/listers/apps/v1" "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned" @@ -84,7 +86,8 @@ func newDeploymentFailed(namespace, name string, err error) pkgreconciler.Event type Reconciler struct { // KubeClientSet allows us to talk to the k8s for core APIs - KubeClientSet kubernetes.Interface + KubeClientSet kubernetes.Interface + DynamicClientSet dynamic.Interface receiveAdapterImage string @@ -145,6 +148,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err)) return err } + scaling.ScaleKafkaSourceWithKeda(ctx, ra, src, r.DynamicClientSet) src.Status.MarkDeployed(ra) src.Status.CloudEventAttributes = r.createCloudEventAttributes(src) From 7ba9cba0fe392763dcee7f31128843e1ec0c0a4d Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 12:32:25 -0400 Subject: [PATCH 03/10] warning about name limits --- kafka/source/README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 75ac5ce81d..834ca67c31 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -103,13 +103,17 @@ go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG ### Create Kafka Source that uses KEDA -To use KEDA the YAML file must have one of autoscaling annotations (for example minScale): +To use KEDA the YAML file must have one of autoscaling annotations (such as minScale). + +Warning: do not give KafkaSource name longer than 4 characters as KEDA will add prefix to deployment name that already has UUID appended HAP can not be created if its name exceed 64 character name limit (keda-hpa-kafkasource-NAME-999036a8-af6e-4431-b671-d052842dddf1) + +Example: ```yaml apiVersion: sources.knative.dev/v1alpha1 kind: KafkaSource metadata: - name: knative-demo-kafka-keda-src1 + name: kn1 annotations: autoscaling.knative.dev/minScale: "0" autoscaling.knative.dev/maxScale: "10" From dc74d1d8b7e0d1e79bd69606faea1e3a884310fb Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 12:36:04 -0400 Subject: [PATCH 04/10] Apply formatting Co-Authored-By: Matt Moore --- kafka/source/README.md | 27 +++++++++++++++++------- kafka/source/config/201-clusterrole.yaml | 1 - 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 834ca67c31..f8dc42c701 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -54,14 +54,19 @@ A more detailed example of the `KafkaSource` can be found in the ## Experimental KEDA support in Kafka Event Source -Warning: this is *experimental* and may be changed in future. Should not be used in production. This is mainly for discussion and evolving scaling in Knative eventing. +Warning: this is _experimental_ and may be changed in future. Should not be used +in production. This is mainly for discussion and evolving scaling in Knative +eventing. -The code is using Unstructured and also imported KEDA API - this is for discussion which version should be used (right now only Unstructured is fully implemented). -KEDA to provide client-go support discussion #494 +The code is using Unstructured and also imported KEDA API - this is for +discussion which version should be used (right now only Unstructured is fully +implemented). KEDA to provide client-go support discussion #494 + ### Install KEDA -To install the version I used for the experiment (the latest version from master should work too): +To install the version I used for the experiment (the latest version from master +should work too): ```bash git clone https://github.com/kedacore/keda.git @@ -69,7 +74,8 @@ cd keda git checkout v1.3.0 ``` -Then follow [KEDA setup instructions](https://keda.sh/deploy/) to deploy using YAML: +Then follow [KEDA setup instructions](https://keda.sh/deploy/) to deploy using +YAML: ```bash kubectl apply -f deploy/crds/keda.k8s.io_scaledobjects_crd.yaml @@ -103,9 +109,13 @@ go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG ### Create Kafka Source that uses KEDA -To use KEDA the YAML file must have one of autoscaling annotations (such as minScale). +To use KEDA the YAML file must have one of autoscaling annotations (such as +minScale). -Warning: do not give KafkaSource name longer than 4 characters as KEDA will add prefix to deployment name that already has UUID appended HAP can not be created if its name exceed 64 character name limit (keda-hpa-kafkasource-NAME-999036a8-af6e-4431-b671-d052842dddf1) +Warning: do not give KafkaSource name longer than 4 characters as KEDA will add +prefix to deployment name that already has UUID appended HAP can not be created +if its name exceed 64 character name limit +(keda-hpa-kafkasource-NAME-999036a8-af6e-4431-b671-d052842dddf1) Example: @@ -131,7 +141,8 @@ spec: name: event-display ``` -To verify that Kafka source is using KEDA retrieve the scaled object created by Kafka source: +To verify that Kafka source is using KEDA retrieve the scaled object created by +Kafka source: ```bash kubectl get scaledobjects.keda.k8s.io diff --git a/kafka/source/config/201-clusterrole.yaml b/kafka/source/config/201-clusterrole.yaml index 79a4c23f92..c3a6fed6de 100644 --- a/kafka/source/config/201-clusterrole.yaml +++ b/kafka/source/config/201-clusterrole.yaml @@ -72,7 +72,6 @@ rules: resources: - scaledobjects verbs: *everything - --- # The role is needed for the aggregated role source-observer in knative-eventing to provide readonly access to "Sources". # See https://github.com/knative/eventing/blob/master/config/200-source-observer-clusterrole.yaml. From d5835043529c6f76a9a8cd3e12725dad703459e5 Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 13:41:09 -0400 Subject: [PATCH 05/10] add scaling implementation --- .../reconciler/scaling/keda_autoscaling.go | 237 ++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 kafka/source/pkg/reconciler/scaling/keda_autoscaling.go diff --git a/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go b/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go new file mode 100644 index 0000000000..9d9ef33b7f --- /dev/null +++ b/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go @@ -0,0 +1,237 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling + +import ( + "context" + "fmt" + "strconv" + "strings" + + "go.uber.org/zap" + v1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" + "knative.dev/pkg/logging" + "knative.dev/serving/pkg/apis/autoscaling" +) + +const ( + kedaAutoscalingAnnotationClass = "keda.autoscaling.knative.dev" + kedaCooldownPeriodAnnodationKey = kedaAutoscalingAnnotationClass + "/cooldownPeriod" + kedaPollingIntervalAnnodationKey = kedaAutoscalingAnnotationClass + "/pollingInterval" + kedaTriggerLagThresholdAnnodationKey = kedaAutoscalingAnnotationClass + "/trigger.lagThreshold" +) + +// ScaleKafkaSourceWithKeda will scale Kafak Source if autoscaling annotations are present +func ScaleKafkaSourceWithKeda(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource, dcs dynamic.Interface) { + // TPPD check for annotations before creating struct + s := &KedaAutoscaling{ + DynamicClientSet: dcs, + } + s.scaleKafkaSource(ctx, ra, src) +} + +// KedaAutoscaling uses KEDA ScaledObject to provide scaling for Kafka source deployment +type KedaAutoscaling struct { + DynamicClientSet dynamic.Interface + + minReplicaCount *int32 + maxReplicaCount *int32 + cooldownPeriod *int32 + pollingInterval *int32 + triggerLagThreshold *int32 +} + +func (r *KedaAutoscaling) scaleKafkaSource(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) { + r.readScalingAnnotations(ctx, src) + // no scaling annotatins so no scaling work needed + if r.minReplicaCount == nil && r.maxReplicaCount == nil { + return + } + _, error := r.deployKedaScaledObject(ctx, ra, src) + if error != nil { + // additional logging? + } +} + +func (r *KedaAutoscaling) deployKedaScaledObject(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) { + logger := logging.FromContext(ctx).Desugar() + deploymentName := ra.GetName() + logger.Info("Got ra", zap.Any("receiveAdapter", ra)) + logger.Info("Got ra name "+deploymentName, zap.Any("deploymentName", deploymentName)) + namespace := src.Namespace + name := src.Name + gvk := schema.GroupVersionKind{ + Group: "keda.k8s.io", + Version: "v1alpha1", + Kind: "ScaledObject", + } + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + scaledObjectResourceInterface := r.DynamicClientSet.Resource(gvr).Namespace(namespace) + if scaledObjectResourceInterface == nil { + return nil, fmt.Errorf("unable to create dynamic client for ScaledObject") + } + scaledObjectUnstr, err := r.generateKedaScaledObjectUnstructured(ctx, ra, src) + if err != nil { + return nil, err + } + created, err := scaledObjectResourceInterface.Create(scaledObjectUnstr, metav1.CreateOptions{}) + if err != nil { + logger.Error("Failed to create ScaledObject so going to do update", zap.Error(err)) + //fmt.Printf("Doing update as failed to create ScaledObject: %s \n", err) + // will replace - https://github.com/kubernetes/client-go/blob/master/examples/dynamic-create-update-delete-deployment/main.go + // doing kubectl "replace" https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency + // first get resourceVersion + existing, err := scaledObjectResourceInterface.Get(name, metav1.GetOptions{}) + if err != nil { + logger.Error("Failed to create ScaledObject:", zap.Error(err)) + return nil, err + } + resourceVersion := existing.GetResourceVersion() + scaledObjectUnstr.SetResourceVersion(resourceVersion) + updated, err := scaledObjectResourceInterface.Update(scaledObjectUnstr, metav1.UpdateOptions{}) + if err != nil { + logger.Error("Update failed to create ScaledObject", zap.Error(err)) + return nil, err + } else { + logger.Info("Update success", zap.Any("updated", updated)) + return updated, nil + } + } + return created, nil +} + +func convertMapKeyToInt32(dict map[string]string, key string, logger *zap.Logger) *int32 { + val, ok := dict[key] + if !ok { + return nil + } + i, err := strconv.ParseInt(val, 10, 32) + if err != nil { + logger.Error("Expected annotation value to be integer but got "+val, zap.Any("annotations key", key)) + return nil + } + i32 := int32(i) + return &i32 +} + +func (r *KedaAutoscaling) readScalingAnnotations(ctx context.Context, src *v1alpha1.KafkaSource) { + logger := logging.FromContext(ctx).Desugar() + meta := src.GetObjectMeta() + annotations := meta.GetAnnotations() + if annotations != nil { + scalingClass := annotations[autoscaling.ClassAnnotationKey] + r.minReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MinScaleAnnotationKey, logger) + r.maxReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MaxScaleAnnotationKey, logger) + if scalingClass == kedaAutoscalingAnnotationClass { + r.cooldownPeriod = convertMapKeyToInt32(annotations, kedaCooldownPeriodAnnodationKey, logger) + r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger) + r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger) + r.triggerLagThreshold = convertMapKeyToInt32(annotations, kedaTriggerLagThresholdAnnodationKey, logger) + + } + } +} + +func (r *KedaAutoscaling) generateKedaScaledObjectUnstructured(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) { + logger := logging.FromContext(ctx).Desugar() + deploymentName := ra.GetName() + namespace := src.Namespace + name := src.Name + logger.Info("Unstructured ScaledObject name "+name, zap.Any("name", name)) + srcName := src.GetName() + srcUID := src.GetUID() + srcResVersion := src.GetResourceVersion() + logger.Info("Got srcResVersion="+srcResVersion, zap.Any("srcResVersion", srcResVersion)) + srcKind := src.GetGroupVersionKind().Kind + logger.Info("Got srcKind srcName srcUID", zap.Any("srcKind", srcKind), zap.Any("srcName", srcName), zap.Any("srcUID", srcUID)) + srcBrokerList := src.Spec.BootstrapServers + srcConcumerGroup := src.Spec.ConsumerGroup + triggers := make([]map[string]interface{}, 0, 1) + topics := strings.Split(src.Spec.Topics, ",") + if len(topics) == 0 { + return nil, fmt.Errorf("Comma-separated list of topics can not be empty") + } + for _, topic := range topics { + triggerMetadata := map[string]interface{}{ + "brokerList": srcBrokerList, + "consumerGroup": srcConcumerGroup, + "topic": topic, + } + if r.triggerLagThreshold != nil { + logger.Info("Got triggerLagThreshold", zap.Any("triggerLagThreshold", r.triggerLagThreshold)) + triggerMetadata["lagThreshold"] = strconv.Itoa(int(*r.triggerLagThreshold)) + } + trigger := map[string]interface{}{ + "type": "kafka", + "metadata": triggerMetadata, + } + triggers = append(triggers, trigger) + } + spec := map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{ + "deploymentName": deploymentName, + }, + "triggers": triggers, + } + if r.minReplicaCount != nil { + logger.Info("Got minReplicaCount", zap.Any("minReplicaCount", r.minReplicaCount)) + spec["minReplicaCount"] = *r.minReplicaCount + } + if r.maxReplicaCount != nil { + logger.Info("Got maxReplicaCount", zap.Any("maxReplicaCount", r.maxReplicaCount)) + spec["maxReplicaCount"] = *r.maxReplicaCount + } + if r.cooldownPeriod != nil { + logger.Info("Got cooldownPeriod", zap.Any("cooldownPeriod", r.cooldownPeriod)) + spec["cooldownPeriod"] = *r.cooldownPeriod + } + if r.pollingInterval != nil { + logger.Info("Got pollingInterval", zap.Any("pollingInterval", r.minReplicaCount)) + spec["pollingInterval"] = *r.pollingInterval + } + soUnstr := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "keda.k8s.io/v1alpha1", + "kind": "ScaledObject", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "labels": map[string]interface{}{ + "deploymentName": deploymentName, + }, + "ownerReferences": []map[string]interface{}{{ + "apiVersion": "sources.eventing.knative.dev/v1alpha1", + "kind": srcKind, + "name": srcName, + "uid": srcUID, + "blockOwnerDeletion": true, + "controller": true, + }}, + }, + "spec": spec, + }, + } + logger.Info("Unstructured SO name "+name, zap.Any("name", name), zap.Any("soUnstr", soUnstr)) + return soUnstr, nil +} From 5644e91e1f7d522cf287002813a7768872afc2f7 Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 13:56:42 -0400 Subject: [PATCH 06/10] updated README --- kafka/source/README.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 834ca67c31..8917c286d4 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -105,7 +105,7 @@ go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG To use KEDA the YAML file must have one of autoscaling annotations (such as minScale). -Warning: do not give KafkaSource name longer than 4 characters as KEDA will add prefix to deployment name that already has UUID appended HAP can not be created if its name exceed 64 character name limit (keda-hpa-kafkasource-NAME-999036a8-af6e-4431-b671-d052842dddf1) +Warning: temporary limitation - do not give KafkaSource name longer than 4 characters as KEDA will add prefix to deployment name that already has UUID appended HAP can not be created if its name exceed 64 character name limit (keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). For more details see https://github.com/kedacore/keda/issues/704 Example: @@ -134,7 +134,10 @@ spec: To verify that Kafka source is using KEDA retrieve the scaled object created by Kafka source: ```bash -kubectl get scaledobjects.keda.k8s.io -NAME DEPLOYMENT TRIGGERS AGE -knative-demo-kafka-keda-src1 kafkasource-knative-demo-k-999036a8-af6e-4431-b671-d052842dddf1 kafka 31s +⇒ kubectl get scaledobjects.keda.k8s.io +NAME DEPLOYMENT TRIGGERS AGE +kn1 kafkasource-kn1-0e12266a-93c2-48ee-8d8d-3b6ffbe9d18f kafka 26m +⇒ kubectl get horizontalpodautoscalers.autoscaling +NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE +keda-hpa-kafkasource-kn1-0e12266a-93c2-48ee-8d8d-3b6ffbe9d18f Deployment/kafkasource-kn1-0e12266a-93c2-48ee-8d8d-3b6ffbe9d18f /10 (avg) 1 10 0 26m ``` From e1f95317b68030f5e1d224ff1478efc6dc472316 Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 14:01:41 -0400 Subject: [PATCH 07/10] Apply suggestions from code review Co-Authored-By: Matt Moore --- kafka/source/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 6b0c2f851c..5cd618cdb4 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -112,7 +112,7 @@ go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG To use KEDA the YAML file must have one of autoscaling annotations (such as minScale). -Warning: temporary limitation - do not give KafkaSource name longer +Warning: temporary limitation - do not give KafkaSource name longer than 4 characters as KEDA will add prefix to deployment name that already has UUID appended HAP can not be created if its name exceeds 64 character name limit (keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). From 311cea50ec7c255be45a82adc121c9c9b756b5bd Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 14:11:07 -0400 Subject: [PATCH 08/10] Update kafka/source/README.md Co-Authored-By: Matt Moore --- kafka/source/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 5cd618cdb4..9463aa3294 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -112,11 +112,11 @@ go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG To use KEDA the YAML file must have one of autoscaling annotations (such as minScale). -Warning: temporary limitation - do not give KafkaSource name longer -than 4 characters as KEDA will add prefix to deployment name that -already has UUID appended HAP can not be created if its name exceeds -64 character name limit (keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). -For more details see https://github.com/kedacore/keda/issues/704 +Warning: temporary limitation - do not give KafkaSource name longer than 4 +characters as KEDA will add prefix to deployment name that already has UUID +appended HAP can not be created if its name exceeds 64 character name limit +(keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). For more +details see https://github.com/kedacore/keda/issues/704 Example: From fdfe0a3c3e33e2d5b756fac6fd1d8d9fc5a36f3f Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 14:18:21 -0400 Subject: [PATCH 09/10] format README --- kafka/source/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/source/README.md b/kafka/source/README.md index 5cd618cdb4..9463aa3294 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -112,11 +112,11 @@ go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG To use KEDA the YAML file must have one of autoscaling annotations (such as minScale). -Warning: temporary limitation - do not give KafkaSource name longer -than 4 characters as KEDA will add prefix to deployment name that -already has UUID appended HAP can not be created if its name exceeds -64 character name limit (keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). -For more details see https://github.com/kedacore/keda/issues/704 +Warning: temporary limitation - do not give KafkaSource name longer than 4 +characters as KEDA will add prefix to deployment name that already has UUID +appended HAP can not be created if its name exceeds 64 character name limit +(keda-hpa-kafkasource-SO_NAME-999036a8-af6e-4431-b671-d052842dddf1). For more +details see https://github.com/kedacore/keda/issues/704 Example: From 93390515e2d0c4a7c3904f4e16871bc2e63f5512 Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 27 Mar 2020 14:20:08 -0400 Subject: [PATCH 10/10] Update kafka/source/pkg/reconciler/scaling/keda_autoscaling.go Co-Authored-By: Matt Moore --- kafka/source/pkg/reconciler/scaling/keda_autoscaling.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go b/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go index 9d9ef33b7f..525f6c1fb5 100644 --- a/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go +++ b/kafka/source/pkg/reconciler/scaling/keda_autoscaling.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.