diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c6e02ce87f..656fa08f3d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ - Add Metrics API Scaler ([#1026](https://github.com/kedacore/keda/pull/1026)) - Add cpu/memory Scaler ([#1215](https://github.com/kedacore/keda/pull/1215)) - Add Scaling Strategy for ScaledJob ([#1227](https://github.com/kedacore/keda/pull/1227)) +- Add IBM MQ Scaler ([#1253](https://github.com/kedacore/keda/issues/1253)) ### Improvements diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go new file mode 100644 index 00000000000..97dfeb9282e --- /dev/null +++ b/pkg/scalers/ibmmq_scaler.go @@ -0,0 +1,224 @@ +package scalers + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + + kedautil "github.com/kedacore/keda/pkg/util" +) + +// Default variables and settings +const ( + ibmMqQueueDepthMetricName = "currentQueueDepth" + defaultTargetQueueDepth = 20 + defaultTLSDisabled = false +) + +// IBMMQScaler assigns struct data pointer to metadata variable +type IBMMQScaler struct { + metadata *IBMMQMetadata +} + +// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale +type IBMMQMetadata struct { + host string + queueManager string + queueName string + username string + password string + targetQueueDepth int + tlsDisabled bool +} + +// CommandResponse Full structured response from MQ admin REST query +type CommandResponse struct { + CommandResponse []Response `json:"commandResponse"` +} + +// Response The body of the response returned from the MQ admin query +type Response struct { + Parameters Parameters `json:"parameters"` +} + +// Parameters Contains the current depth of the IBM MQ Queue +type Parameters struct { + Curdepth int `json:"curdepth"` +} + +// NewIBMMQScaler creates a new IBM MQ scaler +func NewIBMMQScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseIBMMQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing IBM MQ metadata: %s", err) + } + + return &IBMMQScaler{metadata: meta}, nil +} + +// Close closes and returns nil +func (s *IBMMQScaler) Close() error { + return nil +} + +// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided +func parseIBMMQMetadata(config *ScalerConfig) (*IBMMQMetadata, error) { + meta := IBMMQMetadata{} + + if val, ok := config.TriggerMetadata["host"]; ok { + _, err := url.ParseRequestURI(val) + if err != nil { + return nil, fmt.Errorf("invalid URL: %s", err) + } + meta.host = val + } else { + return nil, fmt.Errorf("no host URI given") + } + + if val, ok := config.TriggerMetadata["queueManager"]; ok { + meta.queueManager = val + } else { + return nil, fmt.Errorf("no queue manager given") + } + + if val, ok := config.TriggerMetadata["queueName"]; ok { + meta.queueName = val + } else { + return nil, fmt.Errorf("no queue name given") + } + + if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" { + queueDepth, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("invalid targetQueueDepth - must be an integer") + } + meta.targetQueueDepth = queueDepth + } else { + fmt.Println("No target depth defined - setting default") + meta.targetQueueDepth = defaultTargetQueueDepth + } + + if val, ok := config.TriggerMetadata["tls"]; ok { + tlsDisabled, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("invalid tls setting: %s", err) + } + meta.tlsDisabled = tlsDisabled + } else { + fmt.Println("No tls setting defined - setting default") + meta.tlsDisabled = defaultTLSDisabled + } + + if val, ok := config.AuthParams["username"]; ok && val != "" { + meta.username = val + } else if config.TriggerMetadata["usernameFromEnv"] != "" { + meta.username = config.ResolvedEnv[config.TriggerMetadata["usernameFromEnv"]] + } else { + return nil, fmt.Errorf("no username given") + } + + if val, ok := config.AuthParams["password"]; ok && val != "" { + meta.password = val + } else if config.TriggerMetadata["passwordFromEnv"] != "" { + meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]] + } else { + return nil, fmt.Errorf("no password given") + } + + return &meta, nil +} + +// IsActive returns true if there are messages to be processed/if we need to scale from zero +func (s *IBMMQScaler) IsActive(ctx context.Context) (bool, error) { + queueDepth, err := s.getQueueDepthViaHTTP() + if err != nil { + return false, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err) + } + return queueDepth > 0, nil +} + +// getQueueDepthViaHTTP returns the depth of the MQ Queue from the Admin endpoint +func (s *IBMMQScaler) getQueueDepthViaHTTP() (int, error) { + queue := s.metadata.queueName + url := s.metadata.host + + var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestJSON)) + if err != nil { + return 0, fmt.Errorf("failed to request queue depth: %s", err) + } + req.Header.Set("ibm-mq-rest-csrf-token", "value") + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(s.metadata.username, s.metadata.password) + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: s.metadata.tlsDisabled}, + } + client := &http.Client{Transport: tr} + + resp, err := client.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to contact MQ via REST: %s", err) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to ready body of request: %s", err) + } + + var response CommandResponse + err = json.Unmarshal(body, &response) + if err != nil { + return 0, fmt.Errorf("failed to parse JSON: %s", err) + } + + if response.CommandResponse == nil || len(response.CommandResponse) == 0 { + return 0, fmt.Errorf("failed to parse response from REST call: %s", err) + } + return response.CommandResponse[0].Parameters.Curdepth, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *IBMMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueDepth), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "IBMMQ", s.metadata.queueManager, s.metadata.queueName)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueueLengthQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *IBMMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + queueDepth, err := s.getQueueDepthViaHTTP() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: ibmMqQueueDepthMetricName, + Value: *resource.NewQuantity(int64(queueDepth), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go new file mode 100644 index 00000000000..81d7ba92d62 --- /dev/null +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -0,0 +1,112 @@ +package scalers + +import ( + "fmt" + "testing" +) + +// Test host URLs for validation +const ( + testValidMQQueueURL = "https://qmtest.qm2.eu-gb.mq.appdomain.cloud/ibmmq/rest/v2/admin/action/qmgr/QM1/mqsc" + testInvalidMQQueueURL = "testInvalidURL.com" +) + +// Test data struct used for TestIBMMQParseMetadata +type parseIBMMQMetadataTestData struct { + metadata map[string]string + isError bool + authParams map[string]string +} + +var sampleIBMMQResolvedEnv = map[string]string{ + username: "ibmmquser", + password: "ibmmqpass", +} + +// Test metric identifier with test MQ data and it's name +type IBMMQMetricIdentifier struct { + metadataTestData *parseIBMMQMetadataTestData + name string +} + +// Setting metric identifier mock name +var IBMMQMetricIdentifiers = []IBMMQMetricIdentifier{ + {&testIBMMQMetadata[1], "IBMMQ-testQueueManager-testQueue"}, +} + +// Test cases for TestIBMMQParseMetadata test +var testIBMMQMetadata = []parseIBMMQMetadataTestData{ + // Nothing passed + {map[string]string{}, true, map[string]string{}}, + // Properly formed metadata + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Invalid queueDepth using a string + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // No host provided + {map[string]string{"queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, + //Missing queueManager + {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Missing queueName + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Invalid URL + {map[string]string{"host": testInvalidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Properly formed authParams + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // No username provided + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"password": "Pass123"}}, + // No password provided + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername"}}, +} + +// Test MQ Connection metadata is parsed correctly +// should error on missing required field +func TestIBMMQParseMetadata(t *testing.T) { + for _, testData := range testIBMMQMetadata { + _, err := parseIBMMQMetadata(&ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + fmt.Println(testData) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + fmt.Println(testData) + } + } +} + +// Test case for TestParseDefaultQueueDepth test +var testDefaultQueueDepth = []parseIBMMQMetadataTestData{ + {map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, +} + +// Test that DefaultQueueDepth is set when targetQueueDepth is not provided +func TestParseDefaultQueueDepth(t *testing.T) { + for _, testData := range testDefaultQueueDepth { + metadata, err := parseIBMMQMetadata(&ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } else if testData.isError && err == nil { + t.Error("Expected error but got success") + } else if metadata.targetQueueDepth != defaultTargetQueueDepth { + t.Error("Expected default queueDepth =", defaultTargetQueueDepth, "but got", metadata.targetQueueDepth) + } + } +} + +// Create a scaler and check if metrics method is available +func TestIBMMQGetMetricSpecForScaling(t *testing.T) { + for _, testData := range IBMMQMetricIdentifiers { + metadata, err := parseIBMMQMetadata(&ScalerConfig{ResolvedEnv: sampleIBMMQResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams}) + + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockIBMMQScaler := IBMMQScaler{metadata} + metricSpec := mockIBMMQScaler.GetMetricSpecForScaling() + metricName := metricSpec[0].External.Metric.Name + + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index cc1308c4130..6920f7760e1 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -429,6 +429,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewPubSubScaler(config) case "huawei-cloudeye": return scalers.NewHuaweiCloudeyeScaler(config) + case "ibmmq": + return scalers.NewIBMMQScaler(config) case "kafka": return scalers.NewKafkaScaler(config) case "liiklus":