Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Add MetricsAPIScaler #1026

Merged
merged 7 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package scalers

import (
"context"
"encoding/json"
"fmt"
kedautil "github.com/kedacore/keda/pkg/util"
"io/ioutil"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"net/http"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"strconv"
"strings"
)

type metricsAPIScaler struct {
metadata *metricsAPIScalerMetadata
}

type metricsAPIScalerMetadata struct {
targetValue int
url string
metricName string
}

type metric struct {
Name string `json:"name"`
Value float64 `json:"value"`
}

var httpLog = logf.Log.WithName("metrics_api_scaler")

// NewMetricsAPIScaler creates a new HTTP scaler
func NewMetricsAPIScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := metricsAPIMetadata(resolvedEnv, metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing metric API metadata: %s", err)
}
scaler := &metricsAPIScaler{metadata: meta}
err = scaler.checkHealth()
turbaszek marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("error checking metric API health/ endpoint: %s", err)
}

return scaler, nil
}

func metricsAPIMetadata(resolvedEnv, metadata, authParams map[string]string) (*metricsAPIScalerMetadata, error) {
meta := metricsAPIScalerMetadata{}

if val, ok := metadata["targetValue"]; ok {
targetValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}
meta.targetValue = targetValue
} else {
return nil, fmt.Errorf("no targetValue given in metadata")
}

if val, ok := metadata["url"]; ok {
// remove ending / for better string formatting
meta.url = strings.TrimSuffix(val, "/")
} else {
return nil, fmt.Errorf("no url given in metadata")
}

if val, ok := metadata["metricName"]; ok {
meta.metricName = val
} else {
return nil, fmt.Errorf("no metricName given in metadata")
}

return &meta, nil
}

func (s *metricsAPIScaler) checkHealth() error {
u := fmt.Sprintf("%s/health/", s.metadata.url)
_, err := http.Get(u)
return err
}

func (s *metricsAPIScaler) getMetricInfo() (*metric, error) {
var m *metric
u := fmt.Sprintf("%s/metrics/%s/", s.metadata.url, s.metadata.metricName)
r, err := http.Get(u)
if err != nil {
return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &m)
if err != nil {
return nil, err
}
return m, nil
}

// Close does nothing in case of metricsAPIScaler
func (s *metricsAPIScaler) Close() error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *metricsAPIScaler) IsActive(ctx context.Context) (bool, error) {
m, err := s.getMetricInfo()
if err != nil {
httpLog.Error(err, fmt.Sprintf("Error when checking metric value: %s", err))
return false, err
}

return m.Value > 0.0, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *metricsAPIScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
metricName := fmt.Sprintf("%s-%s-%s", "http", kedautil.NormalizeString(s.metadata.url), s.metadata.metricName)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *metricsAPIScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
m, err := s.getMetricInfo()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error requesting metrics endpoint: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(m.Value), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
39 changes: 39 additions & 0 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package scalers

import (
"testing"
)

var metricsAPIResolvedEnv = map[string]string{}

type metricsAPIMetadataTestData struct {
metadata map[string]string
raisesError bool
}

var testMetricsAPIMetadata = []metricsAPIMetadataTestData{
// No metadata
{metadata: map[string]string{}, raisesError: true},
// OK
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "metricName": "metric", "targetValue": "42"}, raisesError: false},
// Target not an int
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "metricName": "metric", "targetValue": "aa"}, raisesError: true},
// Missing metric name
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "targetValue": "aa"}, raisesError: true},
// Missing url
{metadata: map[string]string{"metricName": "metric", "targetValue": "aa"}, raisesError: true},
// Missing targetValue
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "metricName": "metric"}, raisesError: true},
}

func TestParseMetricsAPIMetadata(t *testing.T) {
for _, testData := range testMetricsAPIMetadata {
_, err := metricsAPIMetadata(metricsAPIResolvedEnv, testData.metadata, map[string]string{})
if err != nil && !testData.raisesError {
t.Error("Expected success but got error", err)
}
if err == nil && testData.raisesError {
t.Error("Expected error but got success")
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ func buildScaler(name, namespace, triggerType string, resolvedEnv, triggerMetada
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "liiklus":
return scalers.NewLiiklusScaler(resolvedEnv, triggerMetadata)
case "metrics-api":
ahmelsayed marked this conversation as resolved.
Show resolved Hide resolved
return scalers.NewMetricsAPIScaler(resolvedEnv, triggerMetadata, authParams)
case "mysql":
return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams)
case "postgresql":
Expand Down