Skip to content

Commit

Permalink
refactor ActiveMQ scaler config
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <[email protected]>
  • Loading branch information
wozniakjan committed May 14, 2024
1 parent 613919b commit a3d700e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 151 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037))
- *ActiveMQ Scaler*: Refactor ActiveMQ config parser ([#5797](https://github.com/kedacore/keda/issues/5797))

#### Experimental

Expand Down
216 changes: 73 additions & 143 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"html/template"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/go-logr/logr"
Expand All @@ -20,6 +18,8 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"

type activeMQScaler struct {
metricType v2.MetricTargetType
metadata *activeMQMetadata
Expand All @@ -28,17 +28,65 @@ type activeMQScaler struct {
}

type activeMQMetadata struct {
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
targetQueueSize int64
activationTargetQueueSize int64
corsHeader string
metricName string
triggerIndex int
metricName string
triggerIndex int

ManagementEndpoint string `keda:"name=managementEndpoint, order=triggerMetadata, optional"`
DestinationName string `keda:"name=destinationName, order=triggerMetadata, optional"`
BrokerName string `keda:"name=brokerName, order=triggerMetadata, optional"`

// auth
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`

CorsHeader string `keda:"name=corsHeader, order=triggerMetadata, optional"`

RestAPITemplate string `keda:"name=restAPITemplate, order=triggerMetadata, optional"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, optional, default=10"`
ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, optional, default=0"`
}

func (a *activeMQMetadata) Validate() error {
if a.RestAPITemplate != "" {
// parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
u, err := url.ParseRequestURI(a.RestAPITemplate)
if err != nil {
return fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
}
a.ManagementEndpoint = u.Host
// This returns : type=Broker,brokerName=<<brokerName>>,destinationType=Queue,destinationName=<<destinationName>>
splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0]
replacer := strings.NewReplacer(",", "&")
// This returns a map with key: string types and element type [] string. : map[brokerName:[<<brokerName>>] destinationName:[<<destinationName>>] destinationType:[Queue] type:[Broker]]
v, err := url.ParseQuery(replacer.Replace(splitURL))
if err != nil {
return fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
}
if len(v["destinationName"][0]) == 0 {
return fmt.Errorf("no destinationName is given")
}
a.DestinationName = v["destinationName"][0]
if len(v["brokerName"][0]) == 0 {
return fmt.Errorf("no brokerName given: %s", a.RestAPITemplate)
}
a.BrokerName = v["brokerName"][0]
} else {
a.RestAPITemplate = defaultActiveMQRestAPITemplate
if a.ManagementEndpoint == "" {
return fmt.Errorf("no management endpoint given")
}
if a.DestinationName == "" {
return fmt.Errorf("no destination name given")
}
if a.BrokerName == "" {
return fmt.Errorf("no broker name given")
}
}
if a.CorsHeader == "" {
a.CorsHeader = fmt.Sprintf(defaultCorsHeader, a.ManagementEndpoint)
}
a.metricName = GenerateMetricNameWithIndex(a.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", a.DestinationName)))
return nil
}

type activeMQMonitoring struct {
Expand All @@ -47,12 +95,6 @@ type activeMQMonitoring struct {
Timestamp int64 `json:"timestamp"`
}

const (
defaultTargetQueueSize = 10
defaultActivationTargetQueueSize = 0
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
)

// NewActiveMQScaler creates a new activeMQ Scaler
func NewActiveMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -75,134 +117,22 @@ func NewActiveMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}

func parseActiveMQMetadata(config *scalersconfig.ScalerConfig) (*activeMQMetadata, error) {
meta := activeMQMetadata{}

if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" {
meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"]
var err error
if meta, err = getRestAPIParameters(meta); err != nil {
return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err)
}
} else {
meta.restAPITemplate = defaultActiveMQRestAPITemplate
if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if config.TriggerMetadata["destinationName"] == "" {
return nil, errors.New("no destination name given")
}
meta.destinationName = config.TriggerMetadata["destinationName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]
}

if val, ok := config.TriggerMetadata["targetQueueSize"]; ok {
queueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid targetQueueSize - must be an integer")
}

meta.targetQueueSize = queueSize
} else {
meta.targetQueueSize = defaultTargetQueueSize
}

if val, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok {
activationTargetQueueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer")
}
meta.activationTargetQueueSize = activationTargetQueueSize
} else {
meta.activationTargetQueueSize = defaultActivationTargetQueueSize
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok && val != "" {
username := val

if val, ok := config.ResolvedEnv[username]; ok && val != "" {
meta.username = val
} else {
meta.username = username
}
}

if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" {
meta.corsHeader = config.TriggerMetadata["corsHeader"]
} else {
meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint)
}

if meta.username == "" {
return nil, fmt.Errorf("username cannot be empty")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["password"]; ok && val != "" {
password := val

if val, ok := config.ResolvedEnv[password]; ok && val != "" {
meta.password = val
} else {
meta.password = password
}
}

if meta.password == "" {
return nil, fmt.Errorf("password cannot be empty")
}

meta.metricName = GenerateMetricNameWithIndex(config.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName)))

meta := &activeMQMetadata{}
meta.triggerIndex = config.TriggerIndex

return &meta, nil
}

// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) {
u, err := url.ParseRequestURI(meta.restAPITemplate)
if err != nil {
return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}

meta.managementEndpoint = u.Host
splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<<brokerName>>,destinationType=Queue,destinationName=<<destinationName>>
replacer := strings.NewReplacer(",", "&")
v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<<brokerName>>] destinationName:[<<destinationName>>] destinationType:[Queue] type:[Broker]]
if err != nil {
return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %w", err)
}

if len(v["destinationName"][0]) == 0 {
return meta, errors.New("no destinationName is given")
}
meta.destinationName = v["destinationName"][0]

if len(v["brokerName"][0]) == 0 {
return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate)
}
meta.brokerName = v["brokerName"][0]

return meta, nil
}

func (s *activeMQScaler) getMonitoringEndpoint() (string, error) {
var buf bytes.Buffer
endpoint := map[string]string{
"ManagementEndpoint": s.metadata.managementEndpoint,
"BrokerName": s.metadata.brokerName,
"DestinationName": s.metadata.destinationName,
"ManagementEndpoint": s.metadata.ManagementEndpoint,
"BrokerName": s.metadata.BrokerName,
"DestinationName": s.metadata.DestinationName,
}
template, err := template.New("monitoring_endpoint").Parse(s.metadata.restAPITemplate)
template, err := template.New("monitoring_endpoint").Parse(s.metadata.RestAPITemplate)
if err != nil {
return "", fmt.Errorf("error parsing template: %w", err)
}
Expand Down Expand Up @@ -230,9 +160,9 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error
}

// Add HTTP Auth and Headers
req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Origin", s.metadata.corsHeader)
req.Header.Set("Origin", s.metadata.CorsHeader)

resp, err := client.Do(req)
if err != nil {
Expand All @@ -250,7 +180,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error
return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize))
s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.TargetQueueSize))

return queueMessageCount, nil
}
Expand All @@ -261,7 +191,7 @@ func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: GetMetricTarget(s.metricType, s.metadata.targetQueueSize),
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueSize),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Expand All @@ -277,7 +207,7 @@ func (s *activeMQScaler) GetMetricsAndActivity(ctx context.Context, metricName s

metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.activationTargetQueueSize, nil
return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil
}

func (s *activeMQScaler) Close(context.Context) error {
Expand Down
17 changes: 9 additions & 8 deletions pkg/scalers/activemq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

const (
testInvalidRestAPITemplate = "testInvalidRestAPITemplate"
defaultTargetQueueSize = 10
)

type parseActiveMQMetadataTestData struct {
Expand Down Expand Up @@ -228,8 +229,8 @@ func TestActiveMQDefaultCorsHeader(t *testing.T) {
if err != nil {
t.Error("Expected success but got error", err)
}
if !(meta.corsHeader == "http://localhost:8161") {
t.Errorf("Expected http://localhost:8161 but got %s", meta.corsHeader)
if !(meta.CorsHeader == "http://localhost:8161") {
t.Errorf("Expected http://localhost:8161 but got %s", meta.CorsHeader)
}
}

Expand All @@ -240,8 +241,8 @@ func TestActiveMQCorsHeader(t *testing.T) {
if err != nil {
t.Error("Expected success but got error", err)
}
if !(meta.corsHeader == "test") {
t.Errorf("Expected test but got %s", meta.corsHeader)
if !(meta.CorsHeader == "test") {
t.Errorf("Expected test but got %s", meta.CorsHeader)
}
}

Expand All @@ -255,8 +256,8 @@ func TestParseActiveMQMetadata(t *testing.T) {
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.password)
if metadata != nil && metadata.Password != "" && metadata.Password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.Password)
fmt.Println(testData)
}
})
Expand Down Expand Up @@ -288,8 +289,8 @@ func TestParseDefaultTargetQueueSize(t *testing.T) {
t.Error("Expected success but got error", err)
case testData.isError && err == nil:
t.Error("Expected error but got success")
case metadata.targetQueueSize != defaultTargetQueueSize:
t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize)
case metadata.TargetQueueSize != defaultTargetQueueSize:
t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.TargetQueueSize)
}
})
}
Expand Down

0 comments on commit a3d700e

Please sign in to comment.