From f610fd4dca046bc15e8cae54fe9d78994f09b736 Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Tue, 14 May 2024 18:04:20 +0200 Subject: [PATCH] refactor ActiveMQ scaler config Signed-off-by: Jan Wozniak --- pkg/scalers/activemq_scaler.go | 216 ++++++++++------------------ pkg/scalers/activemq_scaler_test.go | 17 +-- 2 files changed, 82 insertions(+), 151 deletions(-) diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go index 243ddff777d..1512052d110 100644 --- a/pkg/scalers/activemq_scaler.go +++ b/pkg/scalers/activemq_scaler.go @@ -4,12 +4,10 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "html/template" "net/http" "net/url" - "strconv" "strings" "github.com/go-logr/logr" @@ -20,6 +18,8 @@ import ( kedautil "github.com/kedacore/keda/v2/pkg/util" ) +const defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize" + type activeMQScaler struct { metricType v2.MetricTargetType metadata *activeMQMetadata @@ -28,17 +28,65 @@ type activeMQScaler struct { } type activeMQMetadata struct { - managementEndpoint string - destinationName string - brokerName string - username string - password string - restAPITemplate string - targetQueueSize int64 - activationTargetQueueSize int64 - corsHeader string - metricName string - triggerIndex int + metricName string + triggerIndex int + + ManagementEndpoint string `keda:"name=managementEndpoint, order=triggerMetadata, optional"` + DestinationName string `keda:"name=destinationName, order=triggerMetadata, optional"` + BrokerName string `keda:"name=brokerName, order=triggerMetadata, optional"` + + // auth + Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` + Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` + + CorsHeader string `keda:"name=corsHeader, order=triggerMetadata, optional"` + + RestAPITemplate string `keda:"name=restAPITemplate, order=triggerMetadata, optional"` + TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, optional, default=10"` + ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, optional, default=0"` +} + +func (a *activeMQMetadata) Validate() error { + if a.RestAPITemplate != "" { + // parse restAPITemplate to provide managementEndpoint, brokerName, destinationName + u, err := url.ParseRequestURI(a.RestAPITemplate) + if err != nil { + return fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err) + } + a.ManagementEndpoint = u.Host + // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> + splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] + replacer := strings.NewReplacer(",", "&") + // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] + v, err := url.ParseQuery(replacer.Replace(splitURL)) + if err != nil { + return fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err) + } + if len(v["destinationName"][0]) == 0 { + return fmt.Errorf("no destinationName is given") + } + a.DestinationName = v["destinationName"][0] + if len(v["brokerName"][0]) == 0 { + return fmt.Errorf("no brokerName given: %s", a.RestAPITemplate) + } + a.BrokerName = v["brokerName"][0] + } else { + a.RestAPITemplate = defaultActiveMQRestAPITemplate + if a.ManagementEndpoint == "" { + return fmt.Errorf("no management endpoint given") + } + if a.DestinationName == "" { + return fmt.Errorf("no destination name given") + } + if a.BrokerName == "" { + return fmt.Errorf("no broker name given") + } + } + if a.CorsHeader == "" { + a.CorsHeader = fmt.Sprintf(defaultCorsHeader, a.ManagementEndpoint) + } + a.metricName = GenerateMetricNameWithIndex(a.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", a.DestinationName))) + return nil } type activeMQMonitoring struct { @@ -47,12 +95,6 @@ type activeMQMonitoring struct { Timestamp int64 `json:"timestamp"` } -const ( - defaultTargetQueueSize = 10 - defaultActivationTargetQueueSize = 0 - defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize" -) - // NewActiveMQScaler creates a new activeMQ Scaler func NewActiveMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) @@ -75,134 +117,22 @@ func NewActiveMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { } func parseActiveMQMetadata(config *scalersconfig.ScalerConfig) (*activeMQMetadata, error) { - meta := activeMQMetadata{} - - if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" { - meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"] - var err error - if meta, err = getRestAPIParameters(meta); err != nil { - return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err) - } - } else { - meta.restAPITemplate = defaultActiveMQRestAPITemplate - if config.TriggerMetadata["managementEndpoint"] == "" { - return nil, errors.New("no management endpoint given") - } - meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] - - if config.TriggerMetadata["destinationName"] == "" { - return nil, errors.New("no destination name given") - } - meta.destinationName = config.TriggerMetadata["destinationName"] - - if config.TriggerMetadata["brokerName"] == "" { - return nil, errors.New("no broker name given") - } - meta.brokerName = config.TriggerMetadata["brokerName"] - } - - if val, ok := config.TriggerMetadata["targetQueueSize"]; ok { - queueSize, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid targetQueueSize - must be an integer") - } - - meta.targetQueueSize = queueSize - } else { - meta.targetQueueSize = defaultTargetQueueSize - } - - if val, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok { - activationTargetQueueSize, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer") - } - meta.activationTargetQueueSize = activationTargetQueueSize - } else { - meta.activationTargetQueueSize = defaultActivationTargetQueueSize - } - - if val, ok := config.AuthParams["username"]; ok && val != "" { - meta.username = val - } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { - username := val - - if val, ok := config.ResolvedEnv[username]; ok && val != "" { - meta.username = val - } else { - meta.username = username - } - } - - if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" { - meta.corsHeader = config.TriggerMetadata["corsHeader"] - } else { - meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint) - } - - if meta.username == "" { - return nil, fmt.Errorf("username cannot be empty") - } - - if val, ok := config.AuthParams["password"]; ok && val != "" { - meta.password = val - } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { - password := val - - if val, ok := config.ResolvedEnv[password]; ok && val != "" { - meta.password = val - } else { - meta.password = password - } - } - - if meta.password == "" { - return nil, fmt.Errorf("password cannot be empty") - } - - meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName))) - + meta := &activeMQMetadata{} meta.triggerIndex = config.TriggerIndex - - return &meta, nil -} - -// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName -func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) { - u, err := url.ParseRequestURI(meta.restAPITemplate) - if err != nil { - return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err) + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing prometheus metadata: %w", err) } - - meta.managementEndpoint = u.Host - splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> - replacer := strings.NewReplacer(",", "&") - v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] - if err != nil { - return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err) - } - - if len(v["destinationName"][0]) == 0 { - return meta, errors.New("no destinationName is given") - } - meta.destinationName = v["destinationName"][0] - - if len(v["brokerName"][0]) == 0 { - return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) - } - meta.brokerName = v["brokerName"][0] - return meta, nil } func (s *activeMQScaler) getMonitoringEndpoint() (string, error) { var buf bytes.Buffer endpoint := map[string]string{ - "ManagementEndpoint": s.metadata.managementEndpoint, - "BrokerName": s.metadata.brokerName, - "DestinationName": s.metadata.destinationName, + "ManagementEndpoint": s.metadata.ManagementEndpoint, + "BrokerName": s.metadata.BrokerName, + "DestinationName": s.metadata.DestinationName, } - template, err := template.New("monitoring_endpoint").Parse(s.metadata.restAPITemplate) + template, err := template.New("monitoring_endpoint").Parse(s.metadata.RestAPITemplate) if err != nil { return "", fmt.Errorf("error parsing template: %w", err) } @@ -230,9 +160,9 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error } // Add HTTP Auth and Headers - req.SetBasicAuth(s.metadata.username, s.metadata.password) + req.SetBasicAuth(s.metadata.Username, s.metadata.Password) req.Header.Set("Content-Type", "application/json") - req.Header.Set("Origin", s.metadata.corsHeader) + req.Header.Set("Origin", s.metadata.CorsHeader) resp, err := client.Do(req) if err != nil { @@ -250,7 +180,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) } - s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize)) + s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.TargetQueueSize)) return queueMessageCount, nil } @@ -261,7 +191,7 @@ func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe Metric: v2.MetricIdentifier{ Name: s.metadata.metricName, }, - Target: GetMetricTarget(s.metricType, s.metadata.targetQueueSize), + Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueSize), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: externalMetricType, @@ -277,7 +207,7 @@ func (s *activeMQScaler) GetMetricsAndActivity(ctx context.Context, metricName s metric := GenerateMetricInMili(metricName, float64(queueSize)) - return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.activationTargetQueueSize, nil + return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil } func (s *activeMQScaler) Close(context.Context) error { diff --git a/pkg/scalers/activemq_scaler_test.go b/pkg/scalers/activemq_scaler_test.go index ac85316b6d7..2be4d523465 100644 --- a/pkg/scalers/activemq_scaler_test.go +++ b/pkg/scalers/activemq_scaler_test.go @@ -11,6 +11,7 @@ import ( const ( testInvalidRestAPITemplate = "testInvalidRestAPITemplate" + defaultTargetQueueSize = 10 ) type parseActiveMQMetadataTestData struct { @@ -228,8 +229,8 @@ func TestActiveMQDefaultCorsHeader(t *testing.T) { if err != nil { t.Error("Expected success but got error", err) } - if !(meta.corsHeader == "http://localhost:8161") { - t.Errorf("Expected http://localhost:8161 but got %s", meta.corsHeader) + if !(meta.CorsHeader == "http://localhost:8161") { + t.Errorf("Expected http://localhost:8161 but got %s", meta.CorsHeader) } } @@ -240,8 +241,8 @@ func TestActiveMQCorsHeader(t *testing.T) { if err != nil { t.Error("Expected success but got error", err) } - if !(meta.corsHeader == "test") { - t.Errorf("Expected test but got %s", meta.corsHeader) + if !(meta.CorsHeader == "test") { + t.Errorf("Expected test but got %s", meta.CorsHeader) } } @@ -255,8 +256,8 @@ func TestParseActiveMQMetadata(t *testing.T) { if testData.isError && err == nil { t.Error("Expected error but got success") } - if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] { - t.Error("Expected password from configuration but found something else: ", metadata.password) + if metadata != nil && metadata.Password != "" && metadata.Password != testData.authParams["password"] { + t.Error("Expected password from configuration but found something else: ", metadata.Password) fmt.Println(testData) } }) @@ -288,8 +289,8 @@ func TestParseDefaultTargetQueueSize(t *testing.T) { t.Error("Expected success but got error", err) case testData.isError && err == nil: t.Error("Expected error but got success") - case metadata.targetQueueSize != defaultTargetQueueSize: - t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize) + case metadata.TargetQueueSize != defaultTargetQueueSize: + t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.TargetQueueSize) } }) }