Skip to content

Commit

Permalink
refactor from review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Dao Thanh Tung <[email protected]>
  • Loading branch information
dttung2905 committed Apr 5, 2024
1 parent 7751c08 commit 70f0cd3
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 214 deletions.
127 changes: 43 additions & 84 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"tls",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
UseAuthentication(),
IsOptional(),
WithDefaultVal("disable"),
)
if err != nil {
Expand All @@ -149,10 +148,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"cert",
reflect.TypeOf(""),
UseMetadata(false),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseAuthentication(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -162,10 +159,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"key",
reflect.TypeOf(""),
UseMetadata(false),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseAuthentication(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -181,10 +176,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"ca",
reflect.TypeOf(""),
UseMetadata(false),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseAuthentication(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -197,10 +190,8 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"keyPassword",
reflect.TypeOf(""),
UseMetadata(false),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseAuthentication(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -215,10 +206,9 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"sasl",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
UseAuthentication(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -234,10 +224,9 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"awsEndpoint",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
UseAuthentication(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -253,10 +242,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"awsRegion",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(false),
UseMetadata(),
)
if err != nil {
return fmt.Errorf("%w. No awsRegion given", err)
Expand All @@ -276,10 +262,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"username",
reflect.TypeOf(""),
UseMetadata(false),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(false),
UseAuthentication(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -290,10 +273,7 @@ func parseApacheKafkaAuthParams(config *scalersconfig.ScalerConfig, meta *apache
config,
"password",
reflect.TypeOf(""),
UseMetadata(false),
UseAuthentication(true),
UseResolvedEnv(false),
IsOptional(false),
UseAuthentication(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -316,10 +296,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"bootstrapServers",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(true),
IsOptional(false),
UseMetadata(),
UseResolvedEnv(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -331,10 +309,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"consumerGroup",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(true),
IsOptional(false),
UseMetadata(),
UseResolvedEnv(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -345,10 +321,9 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"topic",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(true),
IsOptional(true),
UseMetadata(),
UseResolvedEnv(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -367,10 +342,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"partitionLimitation",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(""),
)

Expand All @@ -396,10 +369,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"offsetResetPolicy",
reflect.TypeOf(""),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(""),
)
if err != nil {
Expand All @@ -418,10 +389,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
lagThresholdMetricName,
reflect.TypeOf(64),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(defaultKafkaLagThreshold),
)

Expand All @@ -438,10 +407,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
activationLagThresholdMetricName,
reflect.TypeOf(int64(64)),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(int64(defaultKafkaActivationLagThreshold)),
)
if err != nil {
Expand All @@ -459,10 +426,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"allowIdleConsumers",
reflect.TypeOf(true),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(false),
)
if err != nil {
Expand All @@ -474,10 +439,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"excludePersistentLag",
reflect.TypeOf(true),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(false),
)
if err != nil {
Expand All @@ -489,10 +452,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"scaleToZeroOnInvalidOffset",
reflect.TypeOf(true),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(false),
)
if err != nil {
Expand All @@ -504,10 +465,8 @@ func parseApacheKafkaMetadata(config *scalersconfig.ScalerConfig, logger logr.Lo
config,
"limitToPartitionsWithLag",
reflect.TypeOf(true),
UseMetadata(true),
UseAuthentication(false),
UseResolvedEnv(false),
IsOptional(true),
UseMetadata(),
IsOptional(),
WithDefaultVal(false),
)
if err != nil {
Expand Down
Loading

0 comments on commit 70f0cd3

Please sign in to comment.