Skip to content

Commit

Permalink
New IBM MQ Scaler (#1259)
Browse files Browse the repository at this point in the history
Signed-off-by: Jess McCreery <[email protected]>
Co-authored-by: Liam McNulty <[email protected]>
Co-authored-by: Richard Coppen <[email protected]>
Co-authored-by: cpilton <[email protected]>
Co-authored-by: Jess McCreery <[email protected]>
Co-authored-by: Bimsara Pilapitiya <[email protected]>
Co-authored-by: Rory Jackson <[email protected]>
Co-authored-by: Soheel Chughtai <[email protected]>
Co-authored-by: Nisheeka Nynan <[email protected]>
  • Loading branch information
8 people authored Oct 23, 2020
1 parent 0157900 commit dfc8604
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- Add Metrics API Scaler ([#1026](https://github.com/kedacore/keda/pull/1026))
- Add cpu/memory Scaler ([#1215](https://github.com/kedacore/keda/pull/1215))
- Add Scaling Strategy for ScaledJob ([#1227](https://github.com/kedacore/keda/pull/1227))
- Add IBM MQ Scaler ([#1253](https://github.com/kedacore/keda/issues/1253))

### Improvements

Expand Down
224 changes: 224 additions & 0 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package scalers

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"

kedautil "github.com/kedacore/keda/pkg/util"
)

// Default variables and settings
const (
ibmMqQueueDepthMetricName = "currentQueueDepth"
defaultTargetQueueDepth = 20
defaultTLSDisabled = false
)

// IBMMQScaler assigns struct data pointer to metadata variable
type IBMMQScaler struct {
metadata *IBMMQMetadata
}

// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale
type IBMMQMetadata struct {
host string
queueManager string
queueName string
username string
password string
targetQueueDepth int
tlsDisabled bool
}

// CommandResponse Full structured response from MQ admin REST query
type CommandResponse struct {
CommandResponse []Response `json:"commandResponse"`
}

// Response The body of the response returned from the MQ admin query
type Response struct {
Parameters Parameters `json:"parameters"`
}

// Parameters Contains the current depth of the IBM MQ Queue
type Parameters struct {
Curdepth int `json:"curdepth"`
}

// NewIBMMQScaler creates a new IBM MQ scaler
func NewIBMMQScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseIBMMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing IBM MQ metadata: %s", err)
}

return &IBMMQScaler{metadata: meta}, nil
}

// Close closes and returns nil
func (s *IBMMQScaler) Close() error {
return nil
}

// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided
func parseIBMMQMetadata(config *ScalerConfig) (*IBMMQMetadata, error) {
meta := IBMMQMetadata{}

if val, ok := config.TriggerMetadata["host"]; ok {
_, err := url.ParseRequestURI(val)
if err != nil {
return nil, fmt.Errorf("invalid URL: %s", err)
}
meta.host = val
} else {
return nil, fmt.Errorf("no host URI given")
}

if val, ok := config.TriggerMetadata["queueManager"]; ok {
meta.queueManager = val
} else {
return nil, fmt.Errorf("no queue manager given")
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" {
queueDepth, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("invalid targetQueueDepth - must be an integer")
}
meta.targetQueueDepth = queueDepth
} else {
fmt.Println("No target depth defined - setting default")
meta.targetQueueDepth = defaultTargetQueueDepth
}

if val, ok := config.TriggerMetadata["tls"]; ok {
tlsDisabled, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("invalid tls setting: %s", err)
}
meta.tlsDisabled = tlsDisabled
} else {
fmt.Println("No tls setting defined - setting default")
meta.tlsDisabled = defaultTLSDisabled
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if config.TriggerMetadata["usernameFromEnv"] != "" {
meta.username = config.ResolvedEnv[config.TriggerMetadata["usernameFromEnv"]]
} else {
return nil, fmt.Errorf("no username given")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
} else {
return nil, fmt.Errorf("no password given")
}

return &meta, nil
}

// IsActive returns true if there are messages to be processed/if we need to scale from zero
func (s *IBMMQScaler) IsActive(ctx context.Context) (bool, error) {
queueDepth, err := s.getQueueDepthViaHTTP()
if err != nil {
return false, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err)
}
return queueDepth > 0, nil
}

// getQueueDepthViaHTTP returns the depth of the MQ Queue from the Admin endpoint
func (s *IBMMQScaler) getQueueDepthViaHTTP() (int, error) {
queue := s.metadata.queueName
url := s.metadata.host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestJSON))
if err != nil {
return 0, fmt.Errorf("failed to request queue depth: %s", err)
}
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(s.metadata.username, s.metadata.password)

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: s.metadata.tlsDisabled},
}
client := &http.Client{Transport: tr}

resp, err := client.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST: %s", err)
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to ready body of request: %s", err)
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON: %s", err)
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call: %s", err)
}
return response.CommandResponse[0].Parameters.Curdepth, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *IBMMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueDepth), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "IBMMQ", s.metadata.queueManager, s.metadata.queueName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueueLengthQty,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *IBMMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queueDepth, err := s.getQueueDepthViaHTTP()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: ibmMqQueueDepthMetricName,
Value: *resource.NewQuantity(int64(queueDepth), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
112 changes: 112 additions & 0 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package scalers

import (
"fmt"
"testing"
)

// Test host URLs for validation
const (
testValidMQQueueURL = "https://qmtest.qm2.eu-gb.mq.appdomain.cloud/ibmmq/rest/v2/admin/action/qmgr/QM1/mqsc"
testInvalidMQQueueURL = "testInvalidURL.com"
)

// Test data struct used for TestIBMMQParseMetadata
type parseIBMMQMetadataTestData struct {
metadata map[string]string
isError bool
authParams map[string]string
}

var sampleIBMMQResolvedEnv = map[string]string{
username: "ibmmquser",
password: "ibmmqpass",
}

// Test metric identifier with test MQ data and it's name
type IBMMQMetricIdentifier struct {
metadataTestData *parseIBMMQMetadataTestData
name string
}

// Setting metric identifier mock name
var IBMMQMetricIdentifiers = []IBMMQMetricIdentifier{
{&testIBMMQMetadata[1], "IBMMQ-testQueueManager-testQueue"},
}

// Test cases for TestIBMMQParseMetadata test
var testIBMMQMetadata = []parseIBMMQMetadataTestData{
// Nothing passed
{map[string]string{}, true, map[string]string{}},
// Properly formed metadata
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid queueDepth using a string
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// No host provided
{map[string]string{"queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
//Missing queueManager
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Missing queueName
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid URL
{map[string]string{"host": testInvalidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed authParams
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// No username provided
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"password": "Pass123"}},
// No password provided
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername"}},
}

// Test MQ Connection metadata is parsed correctly
// should error on missing required field
func TestIBMMQParseMetadata(t *testing.T) {
for _, testData := range testIBMMQMetadata {
_, err := parseIBMMQMetadata(&ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
fmt.Println(testData)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
fmt.Println(testData)
}
}
}

// Test case for TestParseDefaultQueueDepth test
var testDefaultQueueDepth = []parseIBMMQMetadataTestData{
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
}

// Test that DefaultQueueDepth is set when targetQueueDepth is not provided
func TestParseDefaultQueueDepth(t *testing.T) {
for _, testData := range testDefaultQueueDepth {
metadata, err := parseIBMMQMetadata(&ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
} else if testData.isError && err == nil {
t.Error("Expected error but got success")
} else if metadata.targetQueueDepth != defaultTargetQueueDepth {
t.Error("Expected default queueDepth =", defaultTargetQueueDepth, "but got", metadata.targetQueueDepth)
}
}
}

// Create a scaler and check if metrics method is available
func TestIBMMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range IBMMQMetricIdentifiers {
metadata, err := parseIBMMQMetadata(&ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams})

if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockIBMMQScaler := IBMMQScaler{metadata}
metricSpec := mockIBMMQScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name

if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewPubSubScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
return scalers.NewIBMMQScaler(config)
case "kafka":
return scalers.NewKafkaScaler(config)
case "liiklus":
Expand Down

0 comments on commit dfc8604

Please sign in to comment.