Skip to content

Commit

Permalink
enabling authentication for metric api scaler (#1137)
Browse files Browse the repository at this point in the history
Signed-off-by: aman-bansal <[email protected]>
  • Loading branch information
aman-bansal authored Oct 9, 2020
1 parent 9433bc1 commit b2abfaf
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 39 deletions.
35 changes: 1 addition & 34 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package scalers

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -220,7 +218,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin

if metadata.enableTLS {
config.Net.TLS.Enable = true
tlsConfig, err := newTLSConfig(metadata.cert, metadata.key, metadata.ca)
tlsConfig, err := kedautil.NewTLSConfig(metadata.cert, metadata.key, metadata.ca)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -257,37 +255,6 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
return client, admin, nil
}

// newTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func newTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
valid := false

config := &tls.Config{}

if clientCert != "" && clientKey != "" {
cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
if err != nil {
return nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}
config.Certificates = []tls.Certificate{cert}
valid = true
}

if caCert != "" {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))
config.RootCAs = caCertPool
config.InsecureSkipVerify = true
valid = true
}

if !valid {
config = nil
}

return config, nil
}

func (s *kafkaScaler) getPartitions() ([]int32, error) {
topicsMetadata, err := s.admin.DescribeTopics([]string{s.metadata.topic})
if err != nil {
Expand Down
169 changes: 165 additions & 4 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

neturl "net/url"

"github.com/tidwall/gjson"
"k8s.io/api/autoscaling/v2beta2"
Expand All @@ -21,26 +25,73 @@ import (

type metricsAPIScaler struct {
metadata *metricsAPIScalerMetadata
client *http.Client
}

type metricsAPIScalerMetadata struct {
targetValue int
url string
valueLocation string

//apiKeyAuth
enableAPIKeyAuth bool
method string // way of providing auth key, either "header" (default) or "query"
// keyParamName is either header key or query param used for passing apikey
// default header is "X-API-KEY", defaul query param is "api_key"
keyParamName string
apiKey string

//base auth
enableBaseAuth bool
username string
password string // +optional

//client certification
enableTLS bool
cert string
key string
ca string
}

const defaultTimeOut = 3 * time.Second

type authenticationType string

const (
apiKeyAuth authenticationType = "apiKeyAuth"
basicAuth authenticationType = "basicAuth"
tlsAuth authenticationType = "tlsAuth"
)

var httpLog = logf.Log.WithName("metrics_api_scaler")

// NewMetricsAPIScaler creates a new HTTP scaler
func NewMetricsAPIScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := metricsAPIMetadata(metadata)
meta, err := metricsAPIMetadata(metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing metric API metadata: %s", err)
}
return &metricsAPIScaler{metadata: meta}, nil

client := &http.Client{
Timeout: defaultTimeOut,
}

if meta.enableTLS {
config, err := kedautil.NewTLSConfig(meta.cert, meta.key, meta.ca)
if err != nil {
return nil, err
}

client.Transport = &http.Transport{TLSClientConfig: config}
}

return &metricsAPIScaler{
metadata: meta,
client: client,
}, nil
}

func metricsAPIMetadata(metadata map[string]string) (*metricsAPIScalerMetadata, error) {
func metricsAPIMetadata(metadata map[string]string, authParams map[string]string) (*metricsAPIScalerMetadata, error) {
meta := metricsAPIScalerMetadata{}

if val, ok := metadata["targetValue"]; ok {
Expand All @@ -65,6 +116,62 @@ func metricsAPIMetadata(metadata map[string]string) (*metricsAPIScalerMetadata,
return nil, fmt.Errorf("no valueLocation given in metadata")
}

authMode, ok := authParams["authMode"]
// no authMode specified
if !ok {
return &meta, nil
}

authType := authenticationType(strings.TrimSpace(authMode))
switch authType {
case apiKeyAuth:
if len(authParams["apiKey"]) == 0 {
return nil, errors.New("no apikey provided")
}

meta.apiKey = authParams["apiKey"]
// default behaviour is header. only change if query param requested
meta.method = "header"
meta.enableAPIKeyAuth = true

if authParams["method"] == "query" {
meta.method = "query"
}

if len(authParams["keyParamName"]) > 0 {
meta.keyParamName = authParams["keyParamName"]
}
case basicAuth:
if len(authParams["username"]) == 0 {
return nil, errors.New("no username given")
}

meta.username = authParams["username"]
// password is optional. For convenience, many application implements basic auth with
// username as apikey and password as empty
meta.password = authParams["password"]
meta.enableBaseAuth = true
case tlsAuth:
if len(authParams["ca"]) == 0 {
return nil, errors.New("no ca given")
}
meta.ca = authParams["ca"]

if len(authParams["cert"]) == 0 {
return nil, errors.New("no cert given")
}
meta.cert = authParams["cert"]

if len(authParams["key"]) == 0 {
return nil, errors.New("no key given")
}

meta.key = authParams["key"]
meta.enableTLS = true
default:
return nil, fmt.Errorf("err incorrect value for authMode is given: %s", authMode)
}

return &meta, nil
}

Expand All @@ -79,7 +186,12 @@ func GetValueFromResponse(body []byte, valueLocation string) (int64, error) {
}

func (s *metricsAPIScaler) getMetricValue() (int64, error) {
r, err := http.Get(s.metadata.url)
request, err := getMetricAPIServerRequest(s.metadata)
if err != nil {
return 0, err
}

r, err := s.client.Do(request)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -151,3 +263,52 @@ func (s *metricsAPIScaler) GetMetrics(ctx context.Context, metricName string, me

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func getMetricAPIServerRequest(meta *metricsAPIScalerMetadata) (*http.Request, error) {
var req *http.Request
var err error

if meta.enableAPIKeyAuth {
if meta.method == "query" {
url, _ := neturl.Parse(meta.url)
queryString := url.Query()
if len(meta.keyParamName) == 0 {
queryString.Set("api_key", meta.apiKey)
} else {
queryString.Set(meta.keyParamName, meta.apiKey)
}

url.RawQuery = queryString.Encode()
req, err = http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
} else {
// default behaviour is to use header method
req, err = http.NewRequest("GET", meta.url, nil)
if err != nil {
return nil, err
}

if len(meta.keyParamName) == 0 {
req.Header.Add("X-API-KEY", meta.apiKey)
} else {
req.Header.Add(meta.keyParamName, meta.apiKey)
}
}
} else if meta.enableBaseAuth {
req, err = http.NewRequest("GET", meta.url, nil)
if err != nil {
return nil, err
}

req.SetBasicAuth(meta.username, meta.password)
} else {
req, err = http.NewRequest("GET", meta.url, nil)
if err != nil {
return nil, err
}
}

return req, nil
}
55 changes: 54 additions & 1 deletion pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type metricsAPIMetadataTestData struct {
raisesError bool
}

var validMetricAPIMetadata = map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42"}

var testMetricsAPIMetadata = []metricsAPIMetadataTestData{
// No metadata
{metadata: map[string]string{}, raisesError: true},
Expand All @@ -24,9 +26,39 @@ var testMetricsAPIMetadata = []metricsAPIMetadataTestData{
{metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric"}, raisesError: true},
}

type metricAPIAuthMetadataTestData struct {
authParams map[string]string
isError bool
}

var testMetricsAPIAuthMetadata = []metricAPIAuthMetadataTestData{
// success TLS
{map[string]string{"authMode": "tlsAuth", "ca": "caaa", "cert": "ceert", "key": "keey"}, false},
// fail TLS, ca not given
{map[string]string{"authMode": "tlsAuth", "cert": "ceert", "key": "keey"}, true},
// fail TLS, key not given
{map[string]string{"authMode": "tlsAuth", "ca": "caaa", "cert": "ceert"}, true},
// fail TLS, cert not given
{map[string]string{"authMode": "tlsAuth", "ca": "caaa", "key": "keey"}, true},
// success apiKeyAuth default
{map[string]string{"authMode": "apiKeyAuth", "apiKey": "apiikey"}, false},
// success apiKeyAuth as query param
{map[string]string{"authMode": "apiKeyAuth", "apiKey": "apiikey", "method": "query"}, false},
// success apiKeyAuth with headers and custom key name
{map[string]string{"authMode": "apiKeyAuth", "apiKey": "apiikey", "method": "header", "keyParamName": "custom"}, false},
// success apiKeyAuth with query param and custom key name
{map[string]string{"authMode": "apiKeyAuth", "apiKey": "apiikey", "method": "query", "keyParamName": "custom"}, false},
// fail apiKeyAuth with no api key
{map[string]string{"authMode": "apiKeyAuth"}, true},
// success basicAuth
{map[string]string{"authMode": "basicAuth", "username": "user", "password": "pass"}, false},
// fail basicAuth with no username
{map[string]string{"authMode": "basicAuth"}, true},
}

func TestParseMetricsAPIMetadata(t *testing.T) {
for _, testData := range testMetricsAPIMetadata {
_, err := metricsAPIMetadata(testData.metadata)
_, err := metricsAPIMetadata(testData.metadata, map[string]string{})
if err != nil && !testData.raisesError {
t.Error("Expected success but got error", err)
}
Expand Down Expand Up @@ -54,3 +86,24 @@ func TestGetValueFromResponse(t *testing.T) {
t.Errorf("Expected %d got %d", 2, v)
}
}

func TestMetricAPIScalerAuthParams(t *testing.T) {
for _, testData := range testMetricsAPIAuthMetadata {
meta, err := metricsAPIMetadata(validMetricAPIMetadata, testData.authParams)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if err == nil {
if (meta.enableAPIKeyAuth && !(testData.authParams["authMode"] == "apiKeyAuth")) ||
(meta.enableBaseAuth && !(testData.authParams["authMode"] == "basicAuth")) ||
(meta.enableTLS && !(testData.authParams["authMode"] == "tlsAuth")) {
t.Error("wrong auth mode detected")
}
}
}
}
38 changes: 38 additions & 0 deletions pkg/util/tls_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package util

import (
"crypto/tls"
"crypto/x509"
"fmt"
)

// NewTLSConfig returns a *tls.Config using the given ceClient cert, ceClient key,
// and CA certificate. If none are appropriate, a nil *tls.Config is returned.
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
valid := false

config := &tls.Config{}

if clientCert != "" && clientKey != "" {
cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
if err != nil {
return nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}
config.Certificates = []tls.Certificate{cert}
valid = true
}

if caCert != "" {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))
config.RootCAs = caCertPool
config.InsecureSkipVerify = true
valid = true
}

if !valid {
config = nil
}

return config, nil
}

0 comments on commit b2abfaf

Please sign in to comment.