Skip to content

Commit

Permalink
[v2] Add MetricsAPIScaler (#1026)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
turbaszek authored Sep 3, 2020
1 parent 8f79ed5 commit ea116fc
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
google.golang.org/api v0.29.0
google.golang.org/genproto v0.0.0-20200731012542-8145dea6a485
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,13 @@ github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2/go.mod h1:QZHgU0
github.com/tektoncd/plumbing/pipelinerun-logs v0.0.0-20191206114338-712d544c2c21/go.mod h1:S62EUWtqmejjJgUMOGB1CCCHRp6C706laH06BoALkzU=
github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tidwall/gjson v1.6.1 h1:LRbvNuNuvAiISWg6gxLEFuCe72UKy5hDqhxW/8183ws=
github.com/tidwall/gjson v1.6.1/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0=
github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/timakin/bodyclose v0.0.0-20190930140734-f7f2e9bca95e/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk=
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
Expand Down
145 changes: 145 additions & 0 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package scalers

import (
"context"
"errors"
"fmt"
kedautil "github.com/kedacore/keda/pkg/util"
"github.com/tidwall/gjson"
"io/ioutil"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"net/http"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"strconv"
)

type metricsAPIScaler struct {
metadata *metricsAPIScalerMetadata
}

type metricsAPIScalerMetadata struct {
targetValue int
url string
valueLocation string
}

var httpLog = logf.Log.WithName("metrics_api_scaler")

// NewMetricsAPIScaler creates a new HTTP scaler
func NewMetricsAPIScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := metricsAPIMetadata(resolvedEnv, metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing metric API metadata: %s", err)
}
return &metricsAPIScaler{metadata: meta}, nil
}

func metricsAPIMetadata(resolvedEnv, metadata, authParams map[string]string) (*metricsAPIScalerMetadata, error) {
meta := metricsAPIScalerMetadata{}

if val, ok := metadata["targetValue"]; ok {
targetValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}
meta.targetValue = targetValue
} else {
return nil, fmt.Errorf("no targetValue given in metadata")
}

if val, ok := metadata["url"]; ok {
meta.url = val
} else {
return nil, fmt.Errorf("no url given in metadata")
}

if val, ok := metadata["valueLocation"]; ok {
meta.valueLocation = val
} else {
return nil, fmt.Errorf("no valueLocation given in metadata")
}

return &meta, nil
}

// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body
func GetValueFromResponse(body []byte, valueLocation string) (int64, error) {
r := gjson.GetBytes(body, valueLocation)
if r.Type != gjson.Number {
msg := fmt.Sprintf("valueLocation must point to value of type number got: %s", r.Type.String())
return 0, errors.New(msg)
}
return int64(r.Num), nil
}

func (s *metricsAPIScaler) getMetricValue() (int64, error) {
r, err := http.Get(s.metadata.url)
if err != nil {
return 0, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return 0, err
}
v, err := GetValueFromResponse(b, s.metadata.valueLocation)
if err != nil {
return 0, err
}
return v, nil
}

// Close does nothing in case of metricsAPIScaler
func (s *metricsAPIScaler) Close() error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *metricsAPIScaler) IsActive(ctx context.Context) (bool, error) {
v, err := s.getMetricValue()
if err != nil {
httpLog.Error(err, fmt.Sprintf("Error when checking metric value: %s", err))
return false, err
}

return v > 0.0, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *metricsAPIScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
metricName := fmt.Sprintf("%s-%s-%s", "http", kedautil.NormalizeString(s.metadata.url), s.metadata.valueLocation)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *metricsAPIScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
v, err := s.getMetricValue()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error requesting metrics endpoint: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(v, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
60 changes: 60 additions & 0 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package scalers

import (
"testing"
)

var metricsAPIResolvedEnv = map[string]string{}
var authParams = map[string]string{}

type metricsAPIMetadataTestData struct {
metadata map[string]string
raisesError bool
}

var testMetricsAPIMetadata = []metricsAPIMetadataTestData{
// No metadata
{metadata: map[string]string{}, raisesError: true},
// OK
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42"}, raisesError: false},
// Target not an int
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "aa"}, raisesError: true},
// Missing metric name
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "targetValue": "aa"}, raisesError: true},
// Missing url
{metadata: map[string]string{"valueLocation": "metric", "targetValue": "aa"}, raisesError: true},
// Missing targetValue
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric"}, raisesError: true},
}

func TestParseMetricsAPIMetadata(t *testing.T) {
for _, testData := range testMetricsAPIMetadata {
_, err := metricsAPIMetadata(metricsAPIResolvedEnv, testData.metadata, authParams)
if err != nil && !testData.raisesError {
t.Error("Expected success but got error", err)
}
if err == nil && testData.raisesError {
t.Error("Expected error but got success")
}
}
}

func TestGetValueFromResponse(t *testing.T) {
d := []byte(`{"components":[{"id": "82328e93e", "tasks": 32}],"count":2.43}`)
v, err := GetValueFromResponse(d, "components.0.tasks")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 32 {
t.Errorf("Expected %d got %d", 32, v)
}

v, err = GetValueFromResponse(d, "count")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 2 {
t.Errorf("Expected %d got %d", 2, v)
}

}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ func buildScaler(name, namespace, triggerType string, resolvedEnv, triggerMetada
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "liiklus":
return scalers.NewLiiklusScaler(resolvedEnv, triggerMetadata)
case "metrics-api":
return scalers.NewMetricsAPIScaler(resolvedEnv, triggerMetadata, authParams)
case "mysql":
return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams)
case "postgresql":
Expand Down

0 comments on commit ea116fc

Please sign in to comment.