Skip to content

Commit

Permalink
Improved scaling speed of AzDO pipelines (#3729)
Browse files Browse the repository at this point in the history
* Improve AzDO profilng speed of queues (#3702)

Signed-off-by: mortx <[email protected]>
  • Loading branch information
Eldarrin authored Oct 10, 2022
1 parent 322ff42 commit 0e78994
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Add `Min` column to ScaledJob visualization ([#3689](https://github.com/kedacore/keda/issues/3689))
- **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))

### Fixes
Expand Down
157 changes: 123 additions & 34 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -21,6 +22,92 @@ const (
defaultTargetPipelinesQueueLength = 1
)

type JobRequests struct {
Count int `json:"count"`
Value []JobRequest `json:"value"`
}

type JobRequest struct {
RequestID int `json:"requestId"`
QueueTime time.Time `json:"queueTime"`
AssignTime time.Time `json:"assignTime,omitempty"`
ReceiveTime time.Time `json:"receiveTime,omitempty"`
LockedUntil time.Time `json:"lockedUntil,omitempty"`
ServiceOwner string `json:"serviceOwner"`
HostID string `json:"hostId"`
Result *string `json:"result"`
ScopeID string `json:"scopeId"`
PlanType string `json:"planType"`
PlanID string `json:"planId"`
JobID string `json:"jobId"`
Demands []string `json:"demands"`
ReservedAgent *struct {
Links struct {
Self struct {
Href string `json:"href"`
} `json:"self"`
Web struct {
Href string `json:"href"`
} `json:"web"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
OsDescription string `json:"osDescription"`
Enabled bool `json:"enabled"`
Status string `json:"status"`
ProvisioningState string `json:"provisioningState"`
AccessPoint string `json:"accessPoint"`
} `json:"reservedAgent,omitempty"`
Definition struct {
Links struct {
Web struct {
Href string `json:"href"`
} `json:"web"`
Self struct {
Href string `json:"href"`
} `json:"self"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
} `json:"definition"`
Owner struct {
Links struct {
Web struct {
Href string `json:"href"`
} `json:"web"`
Self struct {
Href string `json:"href"`
} `json:"self"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
} `json:"owner"`
Data struct {
ParallelismTag string `json:"ParallelismTag"`
IsScheduledKey string `json:"IsScheduledKey"`
} `json:"data"`
PoolID int `json:"poolId"`
OrchestrationID string `json:"orchestrationId"`
Priority int `json:"priority"`
MatchedAgents *[]struct {
Links struct {
Self struct {
Href string `json:"href"`
} `json:"self"`
Web struct {
Href string `json:"href"`
} `json:"web"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Enabled bool `json:"enabled"`
Status string `json:"status"`
ProvisioningState string `json:"provisioningState"`
} `json:"matchedAgents,omitempty"`
}

type azurePipelinesPoolNameResponse struct {
Value []struct {
ID int `json:"id"`
Expand Down Expand Up @@ -243,47 +330,50 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
return -1, err
}

var result map[string]interface{}
err = json.Unmarshal(body, &result)
var jrs JobRequests
err = json.Unmarshal(body, &jrs)
if err != nil {
s.logger.Error(err, "Cannot unmarshal ADO JobRequests API response")
return -1, err
}

var count int64
jobs, ok := result["value"].([]interface{})

if !ok {
return -1, fmt.Errorf("the Azure DevOps REST API result returned no value data despite successful code. url: %s", url)
}

// for each job check if it parent fulfilled, then demand fulfilled, then finally pool fulfilled
for _, value := range jobs {
v := value.(map[string]interface{})
if v["result"] == nil {
if s.metadata.parent == "" && s.metadata.demands == "" {
// no plan defined, just add a count
count++
var count int64
for _, job := range stripDeadJobs(jrs.Value) {
if s.metadata.parent == "" && s.metadata.demands == "" {
// no plan defined, just add a count
count++
} else {
if s.metadata.parent == "" {
// doesn't use parent, switch to demand
if getCanAgentDemandFulfilJob(job, s.metadata) {
count++
}
} else {
if s.metadata.parent == "" {
// doesn't use parent, switch to demand
if getCanAgentDemandFulfilJob(v, s.metadata) {
count++
}
} else {
// does use parent
if getCanAgentParentFulfilJob(v, s.metadata) {
count++
}
// does use parent
if getCanAgentParentFulfilJob(job, s.metadata) {
count++
}
}
}
}

return count, err
}

func stripDeadJobs(jobs []JobRequest) []JobRequest {
var filtered []JobRequest
for _, job := range jobs {
if job.Result == nil {
filtered = append(filtered, job)
}
}
return filtered
}

// Determine if the scaledjob has the right demands to spin up
func getCanAgentDemandFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
var demandsReq = v["demands"].([]interface{})
func getCanAgentDemandFulfilJob(jr JobRequest, metadata *azurePipelinesMetadata) bool {
var demandsReq = jr.Demands
var demandsAvail = strings.Split(metadata.demands, ",")
var countDemands = 0
for _, dr := range demandsReq {
Expand All @@ -301,16 +391,15 @@ func getCanAgentDemandFulfilJob(v map[string]interface{}, metadata *azurePipelin
}

// Determine if the Job and Parent Agent Template have matching capabilities
func getCanAgentParentFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
matchedAgents, ok := v["matchedAgents"].([]interface{})
if !ok {
// ADO is already processing
func getCanAgentParentFulfilJob(jr JobRequest, metadata *azurePipelinesMetadata) bool {
matchedAgents := jr.MatchedAgents

if matchedAgents == nil {
return false
}

for _, m := range matchedAgents {
n := m.(map[string]interface{})
if metadata.parent == n["name"].(string) {
for _, m := range *matchedAgents {
if metadata.parent == m.Name {
return true
}
}
Expand Down
32 changes: 21 additions & 11 deletions pkg/scalers/azure_pipelines_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
)

const loadCount = 1000 // the size of the pretend pool completed of job requests

type parseAzurePipelinesMetadataTestData struct {
testName string
metadata map[string]string
Expand Down Expand Up @@ -40,6 +42,9 @@ var testAzurePipelinesMetadata = []parseAzurePipelinesMetadataTestData{
{"all properly formed", map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "1", "targetPipelinesQueueLength": "1", "activationTargetPipelinesQueueLength": "A"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
}

var testJobRequestResponse = `{"count":2,"value":[{"requestId":890659,"queueTime":"2022-09-28T11:19:49.89Z","assignTime":"2022-09-28T11:20:29.5033333Z","receiveTime":"2022-09-28T11:20:32.0530499Z","lockedUntil":"2022-09-28T11:30:32.07Z","serviceOwner":"xxx","hostId":"xxx","scopeId":"xxx","planType":"Build","planId":"xxx","jobId":"xxx","demands":["kubectl","Agent.Version -gtVersion 2.182.1"],"reservedAgent":{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11735"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11735"}},"id":11735,"name":"kube-scaledjob-5nlph-kzpgf","version":"2.210.1","osDescription":"Linux 5.4.0-1089-azure #94~18.04.1-Ubuntu SMP Fri Aug 5 12:34:50 UTC 2022","enabled":true,"status":"online","provisioningState":"Provisioned","accessPoint":"CodexAccessMapping"},"definition":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/definition?definitionId=4869"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Definitions/4869"}},"id":4869,"name":"base - main"},"owner":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/results?buildId=673584"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Builds/673584"}},"id":673584,"name":"20220928.2"},"data":{"ParallelismTag":"Private","IsScheduledKey":"False"},"poolId":44,"orchestrationId":"5c5c8ec9-786f-4e97-99d4-a29279befba3.build.__default","priority":0},{"requestId":890663,"queueTime":"2022-09-28T11:20:22.4633333Z","serviceOwner":"00025394-6065-48ca-87d9-7f5672854ef7","hostId":"41a18c7d-df5e-4032-a4df-d533b56bd2de","scopeId":"02696e26-a35b-424c-86b8-1f54e1b0b4b7","planType":"Build","planId":"b718cfed-493c-46be-a650-88fe762f75aa","jobId":"15b95994-59ec-5502-695d-0b93722883bd","demands":["dotnet60","java","Agent.Version -gtVersion 2.182.1"],"matchedAgents":[{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/1755"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=1755"}},"id":1755,"name":"dotnet60-keda-template","version":"2.210.1","enabled":true,"status":"offline","provisioningState":"Provisioned"},{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11732"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11732"}},"id":11732,"name":"dotnet60-scaledjob-5dsgc-pkqvm","version":"2.210.1","enabled":true,"status":"online","provisioningState":"Provisioned"},{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11733"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11733"}},"id":11733,"name":"dotnet60-scaledjob-zgqnp-8h4z4","version":"2.210.1","enabled":true,"status":"online","provisioningState":"Provisioned"},{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11734"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11734"}},"id":11734,"name":"dotnet60-scaledjob-wr65c-ff2cv","version":"2.210.1","enabled":true,"status":"online","provisioningState":"Provisioned"}],"definition":{"_links":{"web":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_build/definition?definitionId=3129"},"self":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_apis/build/Definitions/3129"}},"id":3129,"name":"Other Build CI"},"owner":{"_links":{"web":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_build/results?buildId=673585"},"self":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_apis/build/Builds/673585"}},"id":673585,"name":"20220928.11"},"data":{"ParallelismTag":"Private","IsScheduledKey":"False"},"poolId":44,"orchestrationId":"b718cfed-493c-46be-a650-88fe762f75aa.buildtest.build_and_test.__default","priority":0}]}`
var deadJob = `{"requestId":890659,"result":"succeeded","queueTime":"2022-09-28T11:19:49.89Z","assignTime":"2022-09-28T11:20:29.5033333Z","receiveTime":"2022-09-28T11:20:32.0530499Z","lockedUntil":"2022-09-28T11:30:32.07Z","serviceOwner":"xxx","hostId":"xxx","scopeId":"xxx","planType":"Build","planId":"xxx","jobId":"xxx","demands":["kubectl","Agent.Version -gtVersion 2.182.1"],"reservedAgent":{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11735"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11735"}},"id":11735,"name":"kube-scaledjob-5nlph-kzpgf","version":"2.210.1","osDescription":"Linux 5.4.0-1089-azure #94~18.04.1-Ubuntu SMP Fri Aug 5 12:34:50 UTC 2022","enabled":true,"status":"online","provisioningState":"Provisioned","accessPoint":"CodexAccessMapping"},"definition":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/definition?definitionId=4869"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Definitions/4869"}},"id":4869,"name":"base - main"},"owner":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/results?buildId=673584"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Builds/673584"}},"id":673584,"name":"20220928.2"},"data":{"ParallelismTag":"Private","IsScheduledKey":"False"},"poolId":44,"orchestrationId":"5c5c8ec9-786f-4e97-99d4-a29279befba3.build.__default","priority":0}`

func TestParseAzurePipelinesMetadata(t *testing.T) {
for _, testData := range testAzurePipelinesMetadata {
t.Run(testData.testName, func(t *testing.T) {
Expand Down Expand Up @@ -173,7 +178,7 @@ func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
meta := azurePipelinesMetadata{}
meta.organizationName = "testOrg"
meta.organizationURL = url
meta.parent = "test-keda-template"
meta.parent = "dotnet60-keda-template"
meta.personalAccessToken = "testPAT"
meta.poolID = 1
meta.targetPipelinesQueueLength = 1
Expand All @@ -182,11 +187,9 @@ func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
}

func TestAzurePipelinesMatchedAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
_, _ = w.Write(buildLoadJSON())
}))

meta := getMatchedAgentMetaData(apiStub.URL)
Expand All @@ -210,7 +213,7 @@ func TestAzurePipelinesMatchedAgent(t *testing.T) {
func getDemandJobMetaData(url string) *azurePipelinesMetadata {
meta := getMatchedAgentMetaData(url)
meta.parent = ""
meta.demands = "testDemand,kubernetes"
meta.demands = "dotnet60,java"

return meta
}
Expand All @@ -224,11 +227,9 @@ func getMismatchDemandJobMetaData(url string) *azurePipelinesMetadata {
}

func TestAzurePipelinesMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
_, _ = w.Write(buildLoadJSON())
}))

meta := getDemandJobMetaData(apiStub.URL)
Expand All @@ -250,11 +251,9 @@ func TestAzurePipelinesMatchedDemandAgent(t *testing.T) {
}

func TestAzurePipelinesNonMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
_, _ = w.Write(buildLoadJSON())
}))

meta := getMismatchDemandJobMetaData(apiStub.URL)
Expand All @@ -274,3 +273,14 @@ func TestAzurePipelinesNonMatchedDemandAgent(t *testing.T) {
t.Fail()
}
}

func buildLoadJSON() []byte {
output := testJobRequestResponse[0 : len(testJobRequestResponse)-2]
for i := 1; i < loadCount; i++ {
output = output + "," + deadJob
}

output += "]}"

return []byte(output)
}

0 comments on commit 0e78994

Please sign in to comment.