-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #13 from cpilton/IBM-MQ-Scaler-v2
IBM MQ Scaler
- Loading branch information
Showing
3 changed files
with
352 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
/** | ||
* © Copyright IBM Corporation 2020 | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
**/ | ||
|
||
package scalers | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
|
||
v2beta2 "k8s.io/api/autoscaling/v2beta2" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
|
||
kedautil "github.com/kedacore/keda/pkg/util" | ||
) | ||
|
||
// Default variables and settings | ||
const ( | ||
ibmMqQueueDepthMetricName = "currentQueueDepth" | ||
defaultTargetQueueDepth = 20 | ||
defaultTLSDisabled = false | ||
) | ||
|
||
// IBMMQScaler assigns struct data pointer to metadata variable | ||
type IBMMQScaler struct { | ||
metadata *IBMMQMetadata | ||
} | ||
|
||
// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale | ||
type IBMMQMetadata struct { | ||
host string | ||
queueName string | ||
username string | ||
password string | ||
targetQueueDepth int | ||
tlsDisabled bool | ||
} | ||
|
||
// CommandResponse Full structured response from MQ admin REST query | ||
type CommandResponse struct { | ||
CommandResponse []Response `json:"commandResponse"` | ||
} | ||
|
||
// Response The body of the response returned from the MQ admin query | ||
type Response struct { | ||
Parameters Parameters `json:"parameters"` | ||
} | ||
|
||
// Parameters Contains the current depth of the IBM MQ Queue | ||
type Parameters struct { | ||
Curdepth int `json:"curdepth"` | ||
} | ||
|
||
// NewIBMMQScaler creates a new IBM MQ scaler | ||
func NewIBMMQScaler(config *ScalerConfig) (Scaler, error) { | ||
meta, err := parseIBMMQMetadata(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing IBM MQ metadata: %s", err) | ||
} | ||
|
||
return &IBMMQScaler{metadata: meta}, nil | ||
} | ||
|
||
// Close closes and returns nil | ||
func (s *IBMMQScaler) Close() error { | ||
return nil | ||
} | ||
|
||
// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided | ||
func parseIBMMQMetadata(config *ScalerConfig) (*IBMMQMetadata, error) { | ||
meta := IBMMQMetadata{} | ||
|
||
if val, ok := config.TriggerMetadata["host"]; ok { | ||
_, err := url.ParseRequestURI(val) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid URL: %s", err) | ||
} | ||
meta.host = val | ||
} else { | ||
return nil, fmt.Errorf("no host URI given") | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["queueName"]; ok { | ||
meta.queueName = val | ||
} else { | ||
return nil, fmt.Errorf("no queue name given") | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" { | ||
queueDepth, err := strconv.Atoi(val) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid targetQueueDepth - must be an integer") | ||
} | ||
meta.targetQueueDepth = queueDepth | ||
} else { | ||
fmt.Println("No target depth defined - setting default") | ||
meta.targetQueueDepth = defaultTargetQueueDepth | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["tls"]; ok { | ||
tlsDisabled, err := strconv.ParseBool(val) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid tls setting: %s", err) | ||
} | ||
meta.tlsDisabled = tlsDisabled | ||
} else { | ||
fmt.Println("No tls setting defined - setting default") | ||
meta.tlsDisabled = defaultTLSDisabled | ||
} | ||
|
||
if val, ok := config.AuthParams["username"]; ok { | ||
meta.username = val | ||
} else { | ||
return nil, fmt.Errorf("no username given") | ||
} | ||
|
||
if val, ok := config.AuthParams["password"]; ok { | ||
meta.password = val | ||
} else { | ||
return nil, fmt.Errorf("no password given") | ||
} | ||
|
||
return &meta, nil | ||
} | ||
|
||
// IsActive returns true if there are messages to be processed/if we need to scale from zero | ||
func (s *IBMMQScaler) IsActive(ctx context.Context) (bool, error) { | ||
queueDepth, err := s.getQueueDepthViaHTTP() | ||
if err != nil { | ||
return false, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err) | ||
} | ||
return queueDepth > 0, nil | ||
} | ||
|
||
// getQueueDepthViaHTTP returns the depth of the MQ Queue from the Admin endpoint | ||
func (s *IBMMQScaler) getQueueDepthViaHTTP() (int, error) { | ||
queue := s.metadata.queueName | ||
url := s.metadata.host | ||
|
||
var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`) | ||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestJSON)) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to request queue depth: %s", err) | ||
} | ||
req.Header.Set("ibm-mq-rest-csrf-token", "value") | ||
req.Header.Set("Content-Type", "application/json") | ||
req.SetBasicAuth(s.metadata.username, s.metadata.password) | ||
|
||
tr := &http.Transport{ | ||
TLSClientConfig: &tls.Config{InsecureSkipVerify: s.metadata.tlsDisabled}, | ||
} | ||
client := &http.Client{Transport: tr} | ||
|
||
resp, err := client.Do(req) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to contact MQ via REST: %s", err) | ||
} | ||
defer resp.Body.Close() | ||
|
||
body, err := ioutil.ReadAll(resp.Body) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to ready body of request: %s", err) | ||
} | ||
|
||
var response CommandResponse | ||
err = json.Unmarshal(body, &response) | ||
if err != nil { | ||
return 0, fmt.Errorf("failed to parse JSON: %s", err) | ||
} | ||
|
||
if response.CommandResponse == nil || len(response.CommandResponse) == 0 { | ||
return 0, fmt.Errorf("failed to parseresponse from REST call: %s", err) | ||
} | ||
return response.CommandResponse[0].Parameters.Curdepth, nil | ||
} | ||
|
||
// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler | ||
func (s *IBMMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { | ||
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueDepth), resource.DecimalSI) | ||
externalMetric := &v2beta2.ExternalMetricSource{ | ||
Metric: v2beta2.MetricIdentifier{ | ||
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "IBMMQ", s.metadata.queueName)), | ||
}, | ||
Target: v2beta2.MetricTarget{ | ||
Type: v2beta2.AverageValueMetricType, | ||
AverageValue: targetQueueLengthQty, | ||
}, | ||
} | ||
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} | ||
return []v2beta2.MetricSpec{metricSpec} | ||
} | ||
|
||
// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric | ||
func (s *IBMMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { | ||
queueDepth, err := s.getQueueDepthViaHTTP() | ||
if err != nil { | ||
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err) | ||
} | ||
|
||
metric := external_metrics.ExternalMetricValue{ | ||
MetricName: ibmMqQueueDepthMetricName, | ||
Value: *resource.NewQuantity(int64(queueDepth), resource.DecimalSI), | ||
Timestamp: metav1.Now(), | ||
} | ||
|
||
return append([]external_metrics.ExternalMetricValue{}, metric), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/** | ||
* © Copyright IBM Corporation 2020 | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
**/ | ||
|
||
package scalers | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
) | ||
|
||
// Test host URLs for validation | ||
const ( | ||
testValidMQQueueURL = "https://qmtest.qm2.eu-gb.mq.appdomain.cloud/ibmmq/rest/v2/admin/action/qmgr/QM1/mqsc" | ||
testInvalidMQQueueURL = "testInvalidURL.com" | ||
) | ||
|
||
// Test data struct used for TestIBMMQParseMetadata | ||
type parseIBMMQMetadataTestData struct { | ||
metadata map[string]string | ||
isError bool | ||
authParams map[string]string | ||
} | ||
|
||
// Test metric identifier with test MQ data and it's name | ||
type IBMMQMetricIdentifier struct { | ||
metadataTestData *parseIBMMQMetadataTestData | ||
name string | ||
} | ||
|
||
// Setting metric identifier mock name | ||
var IBMMQMetricIdentifiers = []IBMMQMetricIdentifier{ | ||
{&testIBMMQMetadata[1], "IBMMQ-testQueue"}, | ||
} | ||
|
||
// Test cases for TestIBMMQParseMetadata test | ||
var testIBMMQMetadata = []parseIBMMQMetadataTestData{ | ||
// Nothing passed | ||
{map[string]string{}, true, map[string]string{}}, | ||
// Properly formed metadata | ||
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
// Invalid queueDepth using a string | ||
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
// No host provided | ||
{map[string]string{"queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
// Missing queueName | ||
{map[string]string{"host": testValidMQQueueURL, "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
// Invalid URL | ||
{map[string]string{"host": testInvalidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
// Properly formed authParams | ||
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
// No username provided | ||
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"password": "Pass123"}}, | ||
// No password provided | ||
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername"}}, | ||
} | ||
|
||
// Test MQ Connection metadata is parsed correctly | ||
// should error on missing required field | ||
func TestIBMMQParseMetadata(t *testing.T) { | ||
for _, testData := range testIBMMQMetadata { | ||
_, err := parseIBMMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) | ||
if err != nil && !testData.isError { | ||
t.Error("Expected success but got error", err) | ||
fmt.Println(testData) | ||
} | ||
if testData.isError && err == nil { | ||
t.Error("Expected error but got success") | ||
fmt.Println(testData) | ||
} | ||
} | ||
} | ||
|
||
// Test case for TestParseDefaultQueueDepth test | ||
var testDefaultQueueDepth = []parseIBMMQMetadataTestData{ | ||
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, | ||
} | ||
|
||
// Test that DefaultQueueDepth is set when targetQueueDepth is not provided | ||
func TestParseDefaultQueueDepth(t *testing.T) { | ||
for _, testData := range testDefaultQueueDepth { | ||
metadata, err := parseIBMMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) | ||
if err != nil && !testData.isError { | ||
t.Error("Expected success but got error", err) | ||
} else if testData.isError && err == nil { | ||
t.Error("Expected error but got success") | ||
} else if metadata.targetQueueDepth != defaultTargetQueueDepth { | ||
t.Error("Expected default queueDepth =", defaultTargetQueueDepth, "but got", metadata.targetQueueDepth) | ||
} | ||
} | ||
} | ||
|
||
// Create a scaler and check if metrics method is available | ||
func TestIBMMQGetMetricSpecForScaling(t *testing.T) { | ||
for _, testData := range IBMMQMetricIdentifiers { | ||
metadata, err := parseIBMMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams}) | ||
|
||
if err != nil { | ||
t.Fatal("Could not parse metadata:", err) | ||
} | ||
mockIBMMQScaler := IBMMQScaler{metadata} | ||
metricSpec := mockIBMMQScaler.GetMetricSpecForScaling() | ||
metricName := metricSpec[0].External.Metric.Name | ||
|
||
if metricName != testData.name { | ||
t.Error("Wrong External metric source name:", metricName) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters