diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 22eaaf3cd24..aa1bb997023 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -26,13 +26,14 @@ type kafkaScaler struct { } type kafkaMetadata struct { - bootstrapServers []string - group string - topic string - lagThreshold int64 - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool - version sarama.KafkaVersion + bootstrapServers []string + group string + topic string + lagThreshold int64 + activationLagThreshold int64 + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool + version sarama.KafkaVersion // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 @@ -71,11 +72,13 @@ const ( ) const ( - lagThresholdMetricName = "lagThreshold" - kafkaMetricType = "External" - defaultKafkaLagThreshold = 10 - defaultOffsetResetPolicy = latest - invalidOffset = -1 + lagThresholdMetricName = "lagThreshold" + activationLagThresholdMetricName = "activationLagThreshold" + kafkaMetricType = "External" + defaultKafkaLagThreshold = 10 + defaultKafkaActivationLagThreshold = 0 + defaultOffsetResetPolicy = latest + invalidOffset = -1 ) // NewKafkaScaler creates a new kafkaScaler @@ -212,6 +215,19 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata meta.lagThreshold = t } + meta.activationLagThreshold = defaultKafkaActivationLagThreshold + + if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok { + t, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return meta, fmt.Errorf("error parsing %q: %s", activationLagThresholdMetricName, err) + } + if t <= 0 { + return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName) + } + meta.activationLagThreshold = t + } + if err := parseKafkaAuthParams(config, &meta); err != nil { return meta, err } @@ -249,31 +265,12 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata // IsActive determines if we need to scale from zero func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { - topicPartitions, err := s.getTopicPartitions() + totalLag, err := s.getTotalLag() if err != nil { return false, err } - consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions) - if err != nil { - return false, err - } - - for topic, partitionsOffsets := range producerOffsets { - for partitionID := range partitionsOffsets { - lag, err := s.getLagForPartition(topic, partitionID, consumerOffsets, producerOffsets) - if err != nil && lag == invalidOffset { - return true, nil - } - s.logger.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, topic, partitionID)) - - // Return as soon as a lag was detected for any partitionID - if lag > 0 { - return true, nil - } - } - } - return false, nil + return totalLag > s.metadata.activationLagThreshold, nil } func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) { @@ -486,14 +483,24 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][ // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - topicPartitions, err := s.getTopicPartitions() + totalLag, err := s.getTotalLag() if err != nil { return []external_metrics.ExternalMetricValue{}, err } + metric := GenerateMetricInMili(metricName, float64(totalLag)) + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +func (s *kafkaScaler) getTotalLag() (int64, error) { + topicPartitions, err := s.getTopicPartitions() + if err != nil { + return 0, err + } consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions) if err != nil { - return []external_metrics.ExternalMetricValue{}, err + return 0, err } totalLag := int64(0) @@ -514,10 +521,7 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS totalLag = totalTopicPartitions * s.metadata.lagThreshold } } - - metric := GenerateMetricInMili(metricName, float64(totalLag)) - - return append([]external_metrics.ExternalMetricValue{}, metric), nil + return totalLag, nil } type brokerOffsetResult struct { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index b9ebfd6d931..b36ccf03b73 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -62,6 +62,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // failure, lagThreshold is 0 {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, + // failure, activationLagThreshold is not int + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success, more brokers @@ -126,8 +128,8 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ } var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ - {&parseKafkaMetadataTestDataset[6], 0, "s0-kafka-my-topic"}, - {&parseKafkaMetadataTestDataset[6], 1, "s1-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[7], 0, "s0-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[7], 1, "s1-kafka-my-topic"}, {&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, } diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts deleted file mode 100644 index a1fb8a077a5..00000000000 --- a/tests/scalers/kafka.test.ts +++ /dev/null @@ -1,395 +0,0 @@ -import * as fs from 'fs' -import * as sh from 'shelljs' -import * as tmp from 'tmp' -import test, { Assertions } from 'ava'; -import { createNamespace, waitForDeploymentReplicaCount } from './helpers'; - -const defaultNamespace = 'kafka-test' -const defaultCluster = 'kafka-cluster' -const timeToWait = 300 -const defaultTopic = 'kafka-topic' -const defaultTopic2 = 'kafka-topic-2' -const zeroInvalidOffsetTopic = 'kafka-topic-zero-invalid-offset' -const oneInvalidOffsetTopic = 'kafka-topic-one-invalid-offset' -const invalidOffsetGroup = 'invalidOffset' -const defaultKafkaClient = 'kafka-client' -const strimziOperatorVersion = '0.23.0' -const bootstrapServer = `${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092` - -const strimziOperatorYamlFile = tmp.fileSync() -const kafkaClusterYamlFile = tmp.fileSync() -const kafkaTopicYamlFile = tmp.fileSync() -const kafkaClientYamlFile = tmp.fileSync() -const kafkaApplicationYamlFile = tmp.fileSync() -const scaledObjectYamlFile = tmp.fileSync() - -function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) { - sh.exec(`echo Deploying ${name}`) - fs.writeFileSync(filename, yaml) - t.is(0, sh.exec(`kubectl apply -f ${filename} --namespace ${defaultNamespace}`).code, `Deploying ${name} should work.`) -} - -function waitForReady(t: Assertions, app: string, name: string, condition: string = 'Ready') { - sh.exec(`echo Waiting for ${app} for ${timeToWait} seconds to be ${condition}`) - t.is( - 0, - sh.exec(`kubectl wait ${app} --for=condition=${condition} --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - `${name} should be ready within given time limit.` - ) -} - -function commitPartition(topic: string, group: string) { - sh.exec(`echo Committing partition for ${topic}:${group}`) - return sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${bootstrapServer}" --topic ${topic} --group ${group} --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`).code == 0 -} - -function publishMessage(topic: string) { - sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${bootstrapServer} --topic ${topic}'`) - sh.exec(`sleep 5s`) -} - -function cleanup(t: Assertions) { - t.is( - 0, - sh.exec(`kubectl delete -f ${scaledObjectYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl delete -f ${kafkaApplicationYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deleting kafka application should work.' - ) - sh.exec(`sleep 10s`) -} - -test.before('Set up, create necessary resources.', async t => { - createNamespace(defaultNamespace) - - sh.config.silent = true - const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout - sh.config.silent = false - - deployFromYaml(t, strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`), 'Strimzi operator') - deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster') - waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance') - - var kafkaTopicsYaml = - kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic).replace('{{PARTITIONS}}', '3') + - kafkaTopicsTemplateYaml.replace('{{TOPIC}}', defaultTopic2).replace('{{PARTITIONS}}', '3') + - kafkaTopicsTemplateYaml.replace('{{TOPIC}}', zeroInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') + - kafkaTopicsTemplateYaml.replace('{{TOPIC}}', oneInvalidOffsetTopic).replace('{{PARTITIONS}}', '1') - deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic') - waitForReady(t, `kafkatopic/${defaultTopic}`,defaultTopic) - waitForReady(t, `kafkatopic/${defaultTopic2}`,defaultTopic2) - waitForReady(t, `kafkatopic/${zeroInvalidOffsetTopic}`,zeroInvalidOffsetTopic) - waitForReady(t, `kafkatopic/${oneInvalidOffsetTopic}`,oneInvalidOffsetTopic) - - deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client') - waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client') -}); - -test.serial('Applying ScaledObject earliest policy should not scale up pods', async t => { - - deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') - deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') - waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') - - t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0') -}); - -test.serial('Scale application with kafka messages.', async t => { - for (let r = 1; r <= 3; r++) { - publishMessage(defaultTopic) - t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) - } -}) - -test.serial('Scale application beyond partition max.', async t => { - publishMessage(defaultTopic) - t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) -}) - -test.serial('cleanup after earliest policy test', t => cleanup(t)) - -test.serial('Applying ScaledObject latest policy should not scale up pods', async t => { - - //Make the consumer commit the first offset for each partition. - t.true(commitPartition(defaultTopic, 'latest'), 'Commit partition should work') - - deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') - sh.exec(`sleep 10s`) - deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') - sh.exec(`sleep 5s`) - t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) -}) - -test.serial('Latest Scale object should scale with new messages', async t => { - - for (let r = 1; r <= 3; r++) { - publishMessage(defaultTopic) - t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) - } -}) - -test.serial('Cleanup after latest policy test', t => cleanup(t)) - -test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => { - // Make the consumer commit the all offsets for all topics in the group - t.true(commitPartition(defaultTopic, 'multiTopic'), 'Commit partition should work') - t.true(commitPartition(defaultTopic2, 'multiTopic'), 'Commit partition should work') - - deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') - sh.exec(`sleep 5s`) - deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectMultipleTopicsYaml, 'Scaled Object') - sh.exec(`sleep 5s`) - - // when lag is 0, scaled object is not active, replica = 0 - t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) - - // produce a single msg to the default topic - // should turn scale object active, replica = 1 - publishMessage(defaultTopic) - t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) - - // produce one more msg to the different topic within the same group - // will turn total consumer group lag to 2. - // with lagThreshold as 1 -> making hpa AverageValue to 1 - // this should turn nb of replicas to 2 - // as desiredReplicaCount = totalLag / avgThreshold - publishMessage(defaultTopic2) - t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`) - - // make it 3 cause why not? - publishMessage(defaultTopic) - t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) -}) - -test.serial('Cleanup after multiple topics test', t => cleanup(t)) - -test.serial('Applying ScaledObject zeroOnInvalidOffset policy should not scale up pods', async t => { - - deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationZeroOnInvalidYaml, 'Kafka application') - deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectZeroOnInvalidOffsetYaml, 'Scaled Object') - waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') - sh.exec(`sleep 30s`) - t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 3000), `Replica count should be 0.`) -}) - -test.serial('cleanup after zeroOnInvalidOffset policy test', t => cleanup(t)) - -test.serial('Applying ScaledObject oneOnInvalidOffset policy should scale to one pod', async t => { - deployFromYaml(t, kafkaApplicationYamlFile.name, kafkaApplicationOneOnInvalidYaml, 'Kafka application') - deployFromYaml(t, scaledObjectYamlFile.name, scaledObjectOneOnInvalidOffsetYaml, 'Scaled Object') - waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') - sh.exec(`sleep 30s`) - t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) -}) - -test.serial('oneOnInvalidOffset Scale object should scale to zero when offset is set', async t => { - t.true(commitPartition(oneInvalidOffsetTopic, invalidOffsetGroup), 'Commit partition should work') - publishMessage(oneInvalidOffsetTopic) - t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 60, 10000), `Replica count should scale down to 0.`) -}) - -test.serial('cleanup after oneOnInvalidOffset policy test', t => cleanup(t)) - - -test.after.always('Clean up, delete created resources.', t => { - const resources = [ - `${kafkaClientYamlFile.name}`, - `${kafkaTopicYamlFile.name}`, - `${kafkaClusterYamlFile.name}`, - `${strimziOperatorYamlFile.name}` - ] - - for (const resource of resources) { - sh.exec(`echo Deleting resource from file ${resource}`) - sh.exec(`kubectl delete -f ${resource} --namespace ${defaultNamespace}`) - } - sh.exec(`echo Deleting namespace ${defaultNamespace}`) - sh.exec(`kubectl delete namespace ${defaultNamespace}`) -}) - -const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta2 -kind: Kafka -metadata: - name: ${defaultCluster} - namespace: ${defaultNamespace} -spec: - kafka: - version: "2.6.0" - replicas: 1 - listeners: - - name: plain - port: 9092 - type: internal - tls: false - - name: tls - port: 9093 - type: internal - tls: true - config: - offsets.topic.replication.factor: 1 - transaction.state.log.replication.factor: 1 - transaction.state.log.min.isr: 1 - log.message.format.version: "2.5" - storage: - type: ephemeral - zookeeper: - replicas: 1 - storage: - type: ephemeral - entityOperator: - topicOperator: {} - userOperator: {}` - -const kafkaTopicsTemplateYaml = `apiVersion: kafka.strimzi.io/v1beta2 -kind: KafkaTopic -metadata: - name: {{TOPIC}} - labels: - strimzi.io/cluster: ${defaultCluster} - namespace: ${defaultNamespace} -spec: - partitions: {{PARTITIONS}} - replicas: 1 - config: - retention.ms: 604800000 - segment.bytes: 1073741824 ---- -` - -const kafkaClientYaml = `apiVersion: v1 -kind: Pod -metadata: - name: ${defaultKafkaClient} - namespace: ${defaultNamespace} -spec: - containers: - - name: ${defaultKafkaClient} - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "exec tail -f /dev/null"` - -const kafkaApplicationTemplateYaml = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafka-consumer - namespace: ${defaultNamespace} - labels: - app: kafka-consumer -spec: - selector: - matchLabels: - app: kafka-consumer - template: - metadata: - labels: - app: kafka-consumer - spec: - containers: - - name: kafka-consumer - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "kafka-console-consumer --bootstrap-server ${bootstrapServer} PARAMS --consumer-property enable.auto.commit=COMMIT"` - -const kafkaApplicationEarliestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group earliest --from-beginning`).replace(/COMMIT/g, 'false') -const kafkaApplicationLatestYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${defaultTopic} --group latest`).replace(/COMMIT/g, 'false') -const kafkaApplicationZeroOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${zeroInvalidOffsetTopic} --group ${invalidOffsetGroup}`).replace(/COMMIT/g, 'true') -const kafkaApplicationOneOnInvalidYaml = kafkaApplicationTemplateYaml.replace(/PARAMS/g, `--topic ${oneInvalidOffsetTopic} --group ${invalidOffsetGroup} --from-beginning`).replace(/COMMIT/g, 'true') - -const kafkaApplicationMultipleTopicsYaml = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafka-consumer - namespace: ${defaultNamespace} - labels: - app: kafka-consumer -spec: - selector: - matchLabels: - app: kafka-consumer - template: - metadata: - labels: - app: kafka-consumer - spec: - containers: - # only recent version of kafka-console-consumer support flag "include" - # old version's equiv flag will violate language-matters commit hook - # work around -> create two consumer container joining the same group - - name: kafka-consumer - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" - - name: kafka-consumer-2 - image: confluentinc/cp-kafka:5.2.1 - command: - - sh - - -c - - "kafka-console-consumer --bootstrap-server ${bootstrapServer} --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` - -const scaledObjectTemplateYaml = `apiVersion: keda.sh/v1alpha1 -kind: ScaledObject -metadata: - name: kafka-consumer-GROUP - namespace: ${defaultNamespace} -spec: - scaleTargetRef: - name: kafka-consumer - triggers: - - type: kafka - metadata: - topic: ${defaultTopic} - bootstrapServers: ${bootstrapServer} - consumerGroup: GROUP - lagThreshold: '1' - offsetResetPolicy: 'GROUP'` - -const scaledObjectEarliestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'earliest') -const scaledObjectLatestYaml = scaledObjectTemplateYaml.replace(/GROUP/g, 'latest') - -const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 -kind: ScaledObject -metadata: - name: kafka-consumer-multi-topic - namespace: ${defaultNamespace} -spec: - scaleTargetRef: - name: kafka-consumer - triggers: - - type: kafka - metadata: - bootstrapServers: ${bootstrapServer} - consumerGroup: multiTopic - lagThreshold: '1' - offsetResetPolicy: 'latest'` - -const scaledObjectInvalidOffsetTemplateYaml = `apiVersion: keda.sh/v1alpha1 -kind: ScaledObject -metadata: - name: kafka-consumer-on-invalid - namespace: ${defaultNamespace} -spec: - scaleTargetRef: - name: kafka-consumer - triggers: - - type: kafka - metadata: - topic: TOPIC - bootstrapServers: ${bootstrapServer} - consumerGroup: ${invalidOffsetGroup} - lagThreshold: '1' - scaleToZeroOnInvalidOffset: 'VALUE' - offsetResetPolicy: 'latest'` - -const scaledObjectZeroOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, zeroInvalidOffsetTopic).replace(/VALUE/g, 'true') -const scaledObjectOneOnInvalidOffsetYaml = scaledObjectInvalidOffsetTemplateYaml.replace(/TOPIC/g, oneInvalidOffsetTopic).replace(/VALUE/g, 'false') diff --git a/tests/scalers_go/kafka/kafka_test.go b/tests/scalers_go/kafka/kafka_test.go new file mode 100644 index 00000000000..64a054bf457 --- /dev/null +++ b/tests/scalers_go/kafka/kafka_test.go @@ -0,0 +1,481 @@ +//go:build e2e +// +build e2e + +package kafka_test + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "kafka-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + kafkaName = fmt.Sprintf("%s-kafka", testName) + kafkaClientName = fmt.Sprintf("%s-client", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) + strimziOperatorVersion = "0.30.0" + topic1 = "kafka-topic" + topic2 = "kafka-topic2" + zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" + oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" + invalidOffsetGroup = "invalidOffset" + topicPartitions = 3 + falseString = "false" + trueString = "true" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + KafkaName string + KafkaTopicName string + KafkaTopicPartitions int + KafkaClientName string + TopicName string + Topic1Name string + Topic2Name string + BootstrapServer string + ResetPolicy string + Params string + Commit string + ScaleToZeroOnInvalid string +} + +type templateValues map[string]string + +const ( + singleDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: kafka-consumer + template: + metadata: + labels: + app: kafka-consumer + spec: + containers: + # only recent version of kafka-console-consumer support flag "include" + # old version's equiv flag will violate language-matters commit hook + # work around -> create two consumer container joining the same group + - name: kafka-consumer + image: confluentinc/cp-kafka:5.2.1 + command: + - sh + - -c + - "kafka-console-consumer --bootstrap-server {{.BootstrapServer}} {{.Params}} --consumer-property enable.auto.commit={{.Commit}}" +` + + multiDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: kafka-consumer + template: + metadata: + labels: + app: kafka-consumer + spec: + containers: + # only recent version of kafka-console-consumer support flag "include" + # old version's equiv flag will violate language-matters commit hook + # work around -> create two consumer container joining the same group + - name: kafka-consumer + image: confluentinc/cp-kafka:5.2.1 + command: + - sh + - -c + - "kafka-console-consumer --bootstrap-server {{.BootstrapServer}} --topic '{{.Topic1Name}}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" + - name: kafka-consumer-2 + image: confluentinc/cp-kafka:5.2.1 + command: + - sh + - -c + - "kafka-console-consumer --bootstrap-server {{.BootstrapServer}} --topic '{{.Topic2Name}}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" +` + + singleScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + activationLagThreshold: '1' + offsetResetPolicy: {{.ResetPolicy}}` + + multiScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + triggers: + - type: kafka + metadata: + bootstrapServers: {{.BootstrapServer}} + consumerGroup: multiTopic + lagThreshold: '1' + offsetResetPolicy: 'latest'` + + invalidOffsetScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + lagThreshold: '1' + scaleToZeroOnInvalidOffset: '{{.ScaleToZeroOnInvalid}}' + offsetResetPolicy: 'latest'` + + kafkaClusterTemplate = `apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: {{.KafkaName}} + namespace: {{.TestNamespace}} +spec: + kafka: + version: "3.1.0" + replicas: 1 + listeners: + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + log.message.format.version: "2.5" + storage: + type: ephemeral + zookeeper: + replicas: 1 + storage: + type: ephemeral + entityOperator: + topicOperator: {} + userOperator: {} +` + + kafkaTopicTemplate = `apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: {{.KafkaTopicName}} + namespace: {{.TestNamespace}} + labels: + strimzi.io/cluster: {{.KafkaName}} + namespace: {{.TestNamespace}} +spec: + partitions: {{.KafkaTopicPartitions}} + replicas: 1 + config: + retention.ms: 604800000 + segment.bytes: 1073741824 +` + kafkaClientTemplate = ` +apiVersion: v1 +kind: Pod +metadata: + name: {{.KafkaClientName}} + namespace: {{.TestNamespace}} +spec: + containers: + - name: {{.KafkaClientName}} + image: confluentinc/cp-kafka:5.2.1 + command: + - sh + - -c + - "exec tail -f /dev/null"` +) + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + installKafkaOperator(t) + addCluster(t, data) + addTopic(t, data, topic1, topicPartitions) + addTopic(t, data, topic2, topicPartitions) + addTopic(t, data, zeroInvalidOffsetTopic, 1) + addTopic(t, data, oneInvalidOffsetTopic, 1) + + // test scaling + testEarliestPolicy(t, kc, data) + testLatestPolicy(t, kc, data) + testMultiTopic(t, kc, data) + testZeroOnInvalidOffset(t, kc, data) + testOneOnInvalidOffset(t, kc, data) + + // cleanup + uninstallKafkaOperator(t) + DeleteKubernetesResources(t, kc, testNamespace, data, templates) +} + +func testEarliestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing earliest policy: scale up ---") + data.Params = fmt.Sprintf("--topic %s --group earliest --from-beginning", topic1) + data.Commit = falseString + data.TopicName = topic1 + data.ResetPolicy = "earliest" + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "singleScaledObjectTemplate", singleScaledObjectTemplate) + + // Shouldn't scale pods applying earliest policy + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + // Shouldn't scale pods with only 1 message due to activation value + publishMessage(t, topic1) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + // Scale application with kafka messages + publishMessage(t, topic1) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 2), + "replica count should be %d after 2 minute", 2) + + // Scale application beyond partition max. + messages := 5 + for i := 0; i < messages; i++ { + publishMessage(t, topic1) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, topicPartitions, 60, 2), + "replica count should be %d after 2 minute", messages) + + KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "singleScaledObjectTemplate", singleScaledObjectTemplate) +} + +func testLatestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing latest policy: scale up ---") + commitPartition(t, topic1, "latest") + data.Params = fmt.Sprintf("--topic %s --group latest", topic1) + data.Commit = falseString + data.TopicName = topic1 + data.ResetPolicy = "latest" + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "singleScaledObjectTemplate", singleScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + // Shouldn't scale pods with only 1 message due to activation value + publishMessage(t, topic1) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + // Scale application with kafka messages + publishMessage(t, topic1) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 2), + "replica count should be %d after 2 minute", 2) + + // Scale application beyond partition max. + messages := 5 + for i := 0; i < messages; i++ { + publishMessage(t, topic1) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, topicPartitions, 60, 2), + "replica count should be %d after 2 minute", messages) + + KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "singleScaledObjectTemplate", singleScaledObjectTemplate) +} + +func testMultiTopic(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing multi topic: scale up ---") + commitPartition(t, topic1, "multiTopic") + commitPartition(t, topic2, "multiTopic") + data.Topic1Name = topic1 + data.Topic2Name = topic2 + KubectlApplyWithTemplate(t, data, "multiDeploymentTemplate", multiDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "multiScaledObjectTemplate", multiScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + // Scale application with kafka messages in topic 1 + publishMessage(t, topic1) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + // Scale application with kafka messages in topic 2 + // // produce one more msg to the different topic within the same group + // // will turn total consumer group lag to 2. + // // with lagThreshold as 1 -> making hpa AverageValue to 1 + // // this should turn nb of replicas to 2 + // // as desiredReplicaCount = totalLag / avgThreshold + publishMessage(t, topic2) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 2), + "replica count should be %d after 2 minute", 2) + + KubectlDeleteWithTemplate(t, data, "multiDeploymentTemplate", multiDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "multiScaledObjectTemplate", multiScaledObjectTemplate) +} + +func testZeroOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing zeroInvalidOffsetTopic: scale up ---") + data.Params = fmt.Sprintf("--topic %s --group %s", zeroInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = trueString + data.TopicName = zeroInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = trueString + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + + // Shouldn't scale pods + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) +} + +func testOneOnInvalidOffset(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing oneInvalidOffsetTopic: scale up ---") + data.Params = fmt.Sprintf("--topic %s --group %s --from-beginning", oneInvalidOffsetTopic, invalidOffsetGroup) + data.Commit = trueString + data.TopicName = oneInvalidOffsetTopic + data.ResetPolicy = invalidOffsetGroup + data.ScaleToZeroOnInvalid = falseString + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) + + // Should scale to 1 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + commitPartition(t, oneInvalidOffsetTopic, invalidOffsetGroup) + publishMessage(t, oneInvalidOffsetTopic) + + // Should scale to 0 + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 10), + "replica count should be %d after 10 minute", 0) + + KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlDeleteWithTemplate(t, data, "invalidOffsetScaledObjectTemplate", invalidOffsetScaledObjectTemplate) +} + +func publishMessage(t *testing.T, topic string) { + _, _, err := ExecCommandOnSpecificPod(t, kafkaClientName, testNamespace, fmt.Sprintf(`echo "{"text": "foo"}" | kafka-console-producer --broker-list %s --topic %s`, bootstrapServer, topic)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func commitPartition(t *testing.T, topic string, group string) { + _, _, err := ExecCommandOnSpecificPod(t, kafkaClientName, testNamespace, fmt.Sprintf(`kafka-console-consumer --bootstrap-server %s --topic %s --group %s --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000`, bootstrapServer, topic, group)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func installKafkaOperator(t *testing.T) { + _, err := ExecuteCommand("helm repo add strimzi https://strimzi.io/charts/") + assert.NoErrorf(t, err, "cannot execute command - %s", err) + _, err = ExecuteCommand("helm repo update") + assert.NoErrorf(t, err, "cannot execute command - %s", err) + _, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --namespace %s --wait %s strimzi/strimzi-kafka-operator --version %s`, + testNamespace, + testName, + strimziOperatorVersion)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func uninstallKafkaOperator(t *testing.T) { + _, err := ExecuteCommand(fmt.Sprintf(`helm uninstall --namespace %s %s`, + testNamespace, + testName)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func addTopic(t *testing.T, data templateData, name string, partitions int) { + data.KafkaTopicName = name + data.KafkaTopicPartitions = partitions + KubectlApplyWithTemplate(t, data, "kafkaTopicTemplate", kafkaTopicTemplate) + _, err := ExecuteCommand(fmt.Sprintf("kubectl wait kafkatopic/%s --for=condition=Ready --timeout=300s --namespace %s", name, testNamespace)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func addCluster(t *testing.T, data templateData) { + KubectlApplyWithTemplate(t, data, "kafkaClusterTemplate", kafkaClusterTemplate) + _, err := ExecuteCommand(fmt.Sprintf("kubectl wait kafka/%s --for=condition=Ready --timeout=300s --namespace %s", kafkaName, testNamespace)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + +func getTemplateData() (templateData, map[string]string) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + KafkaName: kafkaName, + KafkaClientName: kafkaClientName, + BootstrapServer: bootstrapServer, + TopicName: topic1, + Topic1Name: topic1, + Topic2Name: topic2, + ResetPolicy: "", + ScaledObjectName: scaledObjectName, + }, templateValues{ + "kafkaClientTemplate": kafkaClientTemplate, + } +}