Skip to content

Commit

Permalink
add Azure Pipelines Scaler (#1706)
Browse files Browse the repository at this point in the history
  • Loading branch information
troydn authored Apr 14, 2021
1 parent d9f8591 commit 38b5a7f
Show file tree
Hide file tree
Showing 7 changed files with 438 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/nightly-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,9 @@ jobs:
AZURE_SP_TENANT: ${{ secrets.AZURE_SP_TENANT }}
TEST_STORAGE_CONNECTION_STRING: ${{ secrets.TEST_STORAGE_CONNECTION_STRING }}
TEST_LOG_ANALYTICS_WORKSPACE_ID: ${{ secrets.TEST_LOG_ANALYTICS_WORKSPACE_ID }}
AZURE_DEVOPS_ORGANIZATION_URL: ${{ secrets.AZURE_DEVOPS_ORGANIZATION_URL }}
AZURE_DEVOPS_PAT: ${{ secrets.AZURE_DEVOPS_PAT }}
AZURE_DEVOPS_PROJECT: ${{ secrets.AZURE_DEVOPS_PROJECT }}
AZURE_DEVOPS_BUILD_DEFINITON_ID: ${{ secrets.AZURE_DEVOPS_BUILD_DEFINITON_ID }}
AZURE_DEVOPS_POOL_NAME: ${{ secrets.AZURE_DEVOPS_POOL_NAME }}
run: make e2e-test
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

## Unreleased

- Add Azure Pipelines Scaler ([#1706](https://github.com/kedacore/keda/pull/1706))
- Add OpenStack Metrics Scaler ([#1382](https://github.com/kedacore/keda/issues/1382))
- Fixed goroutine leaks in usage of timers ([#1704](https://github.com/kedacore/keda/pull/1704))

Expand Down
193 changes: 193 additions & 0 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package scalers

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"

kedautil "github.com/kedacore/keda/v2/pkg/util"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
defaultTargetPipelinesQueueLength = 1
)

type azurePipelinesScaler struct {
metadata *azurePipelinesMetadata
httpClient *http.Client
}

type azurePipelinesMetadata struct {
organizationURL string
organizationName string
personalAccessToken string
poolID string
targetPipelinesQueueLength int
}

var azurePipelinesLog = logf.Log.WithName("azure_pipelines_scaler")

// NewAzurePipelinesScaler creates a new AzurePipelinesScaler
func NewAzurePipelinesScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseAzurePipelinesMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing azure Pipelines metadata: %s", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)

return &azurePipelinesScaler{
metadata: meta,
httpClient: httpClient,
}, nil
}

func parseAzurePipelinesMetadata(config *ScalerConfig) (*azurePipelinesMetadata, error) {
meta := azurePipelinesMetadata{}
meta.targetPipelinesQueueLength = defaultTargetPipelinesQueueLength

if val, ok := config.TriggerMetadata["targetPipelinesQueueLength"]; ok {
queueLength, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing azure pipelines metadata targetPipelinesQueueLength: %s", err.Error())
}

meta.targetPipelinesQueueLength = queueLength
}

if val, ok := config.AuthParams["organizationURL"]; ok && val != "" {
// Found the organizationURL in a parameter from TriggerAuthentication
meta.organizationURL = val
} else if val, ok := config.TriggerMetadata["organizationURLFromEnv"]; ok && val != "" {
meta.organizationURL = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no organizationURL given")
}

if val := meta.organizationURL[strings.LastIndex(meta.organizationURL, "/")+1:]; val != "" {
meta.organizationName = meta.organizationURL[strings.LastIndex(meta.organizationURL, "/")+1:]
} else {
return nil, fmt.Errorf("failed to extract organization name from organizationURL")
}

if val, ok := config.AuthParams["personalAccessToken"]; ok && val != "" {
// Found the personalAccessToken in a parameter from TriggerAuthentication
meta.personalAccessToken = config.AuthParams["personalAccessToken"]
} else if val, ok := config.TriggerMetadata["personalAccessTokenFromEnv"]; ok && val != "" {
meta.personalAccessToken = config.ResolvedEnv[config.TriggerMetadata["personalAccessTokenFromEnv"]]
} else {
return nil, fmt.Errorf("no personalAccessToken given")
}

if val, ok := config.TriggerMetadata["poolID"]; ok && val != "" {
meta.poolID = val
} else {
return nil, fmt.Errorf("no poolID given")
}

return &meta, nil
}

func (s *azurePipelinesScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queuelen, err := s.GetAzurePipelinesQueueLength(ctx)

if err != nil {
azurePipelinesLog.Error(err, "error getting pipelines queue length")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context) (int, error) {
url := fmt.Sprintf("%s/_apis/distributedtask/pools/%s/jobrequests", s.metadata.organizationURL, s.metadata.poolID)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return -1, err
}

req.SetBasicAuth("PAT", s.metadata.personalAccessToken)

r, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
}
r.Body.Close()

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
return -1, fmt.Errorf("azure Devops REST api returned error. status: %d response: %s", r.StatusCode, string(b))
}

var result map[string]interface{}
err = json.Unmarshal(b, &result)
if err != nil {
return -1, err
}

var count int = 0
jobs, ok := result["value"].([]interface{})

if !ok {
return -1, fmt.Errorf("api result returned no value data")
}

for _, value := range jobs {
v := value.(map[string]interface{})
if v["result"] == nil {
count++
}
}

return count, err
}

func (s *azurePipelinesScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetPipelinesQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetPipelinesQueueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "azure-pipelines-queue", s.metadata.organizationName, s.metadata.poolID)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetPipelinesQueueLengthQty,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *azurePipelinesScaler) IsActive(ctx context.Context) (bool, error) {
queuelen, err := s.GetAzurePipelinesQueueLength(ctx)

if err != nil {
azurePipelinesLog.Error(err, "error)")
return false, err
}

return queuelen > 0, nil
}

func (s *azurePipelinesScaler) Close() error {
return nil
}
73 changes: 73 additions & 0 deletions pkg/scalers/azure_pipelines_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package scalers

import (
"net/http"
"testing"
)

type parseAzurePipelinesMetadataTestData struct {
metadata map[string]string
isError bool
resolvedEnv map[string]string
authParams map[string]string
}

type azurePipelinesMetricIdentifier struct {
metadataTestData *parseAzurePipelinesMetadataTestData
name string
}

var testAzurePipelinesResolvedEnv = map[string]string{
"AZP_URL": "https://dev.azure.com/sample",
"AZP_TOKEN": "sample",
}

var testAzurePipelinesMetadata = []parseAzurePipelinesMetadataTestData{
// empty
{map[string]string{}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// all properly formed
{map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{}},
// using triggerAuthentication
{map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "personalAccessToken": "sample"}},
// missing organizationURL
{map[string]string{"organizationURLFromEnv": "", "personalAccessTokenFromEnv": "sample", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// missing personalAccessToken
{map[string]string{"organizationURLFromEnv": "AZP_URL", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
// missing poolID
{map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
}

var azurePipelinesMetricIdentifiers = []azurePipelinesMetricIdentifier{
{&testAzurePipelinesMetadata[1], "azure-pipelines-queue-sample-1"},
}

func TestParseAzurePipelinesMetadata(t *testing.T) {
for _, testData := range testAzurePipelinesMetadata {
_, err := parseAzurePipelinesMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestAzurePipelinesGetMetricSpecForScaling(t *testing.T) {
for _, testData := range azurePipelinesMetricIdentifiers {
meta, err := parseAzurePipelinesMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockAzurePipelinesScaler := azurePipelinesScaler{
metadata: meta,
httpClient: http.DefaultClient,
}

metricSpec := mockAzurePipelinesScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewAzureLogAnalyticsScaler(config)
case "azure-monitor":
return scalers.NewAzureMonitorScaler(config)
case "azure-pipelines":
return scalers.NewAzurePipelinesScaler(config)
case "azure-queue":
return scalers.NewAzureQueueScaler(config)
case "azure-servicebus":
Expand Down
5 changes: 5 additions & 0 deletions tests/.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ OPENSTACK_PASSWORD=
OPENSTACK_PROJECT_ID=
# OPENSTACK_SWIFT_URL=
# OPENSTACK_REGION_NAME=
AZURE_DEVOPS_ORGANIZATION_URL=
AZURE_DEVOPS_PAT=
AZURE_DEVOPS_PROJECT=
AZURE_DEVOPS_BUILD_DEFINITON_ID=
AZURE_DEVOPS_POOL_NAME=
Loading

0 comments on commit 38b5a7f

Please sign in to comment.