Skip to content

Commit

Permalink
h|i|k|l*: Reference ScaledObject's/ScaledJob's name in the scalers log (
Browse files Browse the repository at this point in the history
  • Loading branch information
zroubalik authored Aug 3, 2022
1 parent b2184ca commit 62739d6
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 74 deletions.
94 changes: 48 additions & 46 deletions pkg/scalers/huawei_cloudeye_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/Huawei/gophercloud/auth/aksk"
"github.com/Huawei/gophercloud/openstack"
"github.com/Huawei/gophercloud/openstack/ces/v1/metricdata"
"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -29,6 +29,7 @@ const (
type huaweiCloudeyeScaler struct {
metricType v2beta2.MetricTargetType
metadata *huaweiCloudeyeMetadata
logger logr.Logger
}

type huaweiCloudeyeMetadata struct {
Expand Down Expand Up @@ -70,27 +71,28 @@ type huaweiAuthorizationMetadata struct {
SecretKey string // Secret key
}

var cloudeyeLog = logf.Log.WithName("huawei_cloudeye_scaler")

// NewHuaweiCloudeyeScaler creates a new huaweiCloudeyeScaler
func NewHuaweiCloudeyeScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

meta, err := parseHuaweiCloudeyeMetadata(config)
logger := InitializeLogger(config, "huawei_cloudeye_scaler")

meta, err := parseHuaweiCloudeyeMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("error parsing Cloudeye metadata: %s", err)
}

return &huaweiCloudeyeScaler{
metricType: metricType,
metadata: meta,
logger: logger,
}, nil
}

func parseHuaweiCloudeyeMetadata(config *ScalerConfig) (*huaweiCloudeyeMetadata, error) {
func parseHuaweiCloudeyeMetadata(config *ScalerConfig, logger logr.Logger) (*huaweiCloudeyeMetadata, error) {
meta := huaweiCloudeyeMetadata{}

meta.metricCollectionTime = defaultCloudeyeMetricCollectionTime
Expand Down Expand Up @@ -124,7 +126,7 @@ func parseHuaweiCloudeyeMetadata(config *ScalerConfig) (*huaweiCloudeyeMetadata,
if val, ok := config.TriggerMetadata["targetMetricValue"]; ok && val != "" {
targetMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudeyeLog.Error(err, "Error parsing targetMetricValue metadata")
logger.Error(err, "Error parsing targetMetricValue metadata")
} else {
meta.targetMetricValue = targetMetricValue
}
Expand All @@ -136,17 +138,17 @@ func parseHuaweiCloudeyeMetadata(config *ScalerConfig) (*huaweiCloudeyeMetadata,
if val, ok := config.TriggerMetadata["activationTargetMetricValue"]; ok && val != "" {
activationTargetMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudeyeLog.Error(err, "Error parsing activationTargetMetricValue metadata")
logger.Error(err, "Error parsing activationTargetMetricValue metadata")
}
meta.activationTargetMetricValue = activationTargetMetricValue
}

if val, ok := config.TriggerMetadata["minMetricValue"]; ok && val != "" {
minMetricValue, err := strconv.ParseFloat(val, 64)
if err != nil {
cloudeyeLog.Error(err, "Error parsing minMetricValue metadata")
logger.Error(err, "Error parsing minMetricValue metadata")
} else {
cloudeyeLog.Error(err, "minMetricValue is deprecated and will be removed in next versions, please use activationTargetMetricValue instead")
logger.Error(err, "minMetricValue is deprecated and will be removed in next versions, please use activationTargetMetricValue instead")
meta.activationTargetMetricValue = minMetricValue
}
} else {
Expand All @@ -156,7 +158,7 @@ func parseHuaweiCloudeyeMetadata(config *ScalerConfig) (*huaweiCloudeyeMetadata,
if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" {
metricCollectionTime, err := strconv.Atoi(val)
if err != nil {
cloudeyeLog.Error(err, "Error parsing metricCollectionTime metadata")
logger.Error(err, "Error parsing metricCollectionTime metadata")
} else {
meta.metricCollectionTime = int64(metricCollectionTime)
}
Expand All @@ -169,7 +171,7 @@ func parseHuaweiCloudeyeMetadata(config *ScalerConfig) (*huaweiCloudeyeMetadata,
if val, ok := config.TriggerMetadata["metricPeriod"]; ok && val != "" {
_, err := strconv.Atoi(val)
if err != nil {
cloudeyeLog.Error(err, "Error parsing metricPeriod metadata")
logger.Error(err, "Error parsing metricPeriod metadata")
} else {
meta.metricPeriod = val
}
Expand Down Expand Up @@ -239,106 +241,106 @@ func gethuaweiAuthorization(authParams map[string]string) (huaweiAuthorizationMe
return meta, nil
}

func (h *huaweiCloudeyeScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := h.GetCloudeyeMetrics()
func (s *huaweiCloudeyeScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := s.GetCloudeyeMetrics()

if err != nil {
cloudeyeLog.Error(err, "Error getting metric value")
s.logger.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

metric := GenerateMetricInMili(metricName, metricValue)
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (h *huaweiCloudeyeScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
func (s *huaweiCloudeyeScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(h.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("huawei-cloudeye-%s", h.metadata.metricsName))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("huawei-cloudeye-%s", s.metadata.metricsName))),
},
Target: GetMetricTargetMili(h.metricType, h.metadata.targetMetricValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

func (h *huaweiCloudeyeScaler) IsActive(ctx context.Context) (bool, error) {
val, err := h.GetCloudeyeMetrics()
func (s *huaweiCloudeyeScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.GetCloudeyeMetrics()

if err != nil {
return false, err
}

return val > h.metadata.activationTargetMetricValue, nil
return val > s.metadata.activationTargetMetricValue, nil
}

func (h *huaweiCloudeyeScaler) Close(context.Context) error {
func (s *huaweiCloudeyeScaler) Close(context.Context) error {
return nil
}

func (h *huaweiCloudeyeScaler) GetCloudeyeMetrics() (float64, error) {
func (s *huaweiCloudeyeScaler) GetCloudeyeMetrics() (float64, error) {
options := aksk.AKSKOptions{
IdentityEndpoint: h.metadata.huaweiAuthorization.IdentityEndpoint,
ProjectID: h.metadata.huaweiAuthorization.ProjectID,
AccessKey: h.metadata.huaweiAuthorization.AccessKey,
SecretKey: h.metadata.huaweiAuthorization.SecretKey,
Region: h.metadata.huaweiAuthorization.Region,
Domain: h.metadata.huaweiAuthorization.Domain,
DomainID: h.metadata.huaweiAuthorization.DomainID,
Cloud: h.metadata.huaweiAuthorization.Cloud,
IdentityEndpoint: s.metadata.huaweiAuthorization.IdentityEndpoint,
ProjectID: s.metadata.huaweiAuthorization.ProjectID,
AccessKey: s.metadata.huaweiAuthorization.AccessKey,
SecretKey: s.metadata.huaweiAuthorization.SecretKey,
Region: s.metadata.huaweiAuthorization.Region,
Domain: s.metadata.huaweiAuthorization.Domain,
DomainID: s.metadata.huaweiAuthorization.DomainID,
Cloud: s.metadata.huaweiAuthorization.Cloud,
}

provider, err := openstack.AuthenticatedClient(options)
if err != nil {
cloudeyeLog.Error(err, "Failed to get the provider")
s.logger.Error(err, "Failed to get the provider")
return -1, err
}
sc, err := openstack.NewCESV1(provider, gophercloud.EndpointOpts{})

if err != nil {
cloudeyeLog.Error(err, "get ces client failed")
s.logger.Error(err, "get ces client failed")
if ue, ok := err.(*gophercloud.UnifiedError); ok {
cloudeyeLog.Info("ErrCode:", ue.ErrorCode())
cloudeyeLog.Info("Message:", ue.Message())
s.logger.Info("ErrCode:", ue.ErrorCode())
s.logger.Info("Message:", ue.Message())
}
return -1, err
}

opts := metricdata.BatchQueryOpts{
Metrics: []metricdata.Metric{
{
Namespace: h.metadata.namespace,
Namespace: s.metadata.namespace,
Dimensions: []map[string]string{
{
"name": h.metadata.dimensionName,
"value": h.metadata.dimensionValue,
"name": s.metadata.dimensionName,
"value": s.metadata.dimensionValue,
},
},
MetricName: h.metadata.metricsName,
MetricName: s.metadata.metricsName,
},
},
From: time.Now().Truncate(time.Minute).Add(time.Second*-1*time.Duration(h.metadata.metricCollectionTime)).UnixNano() / 1e6,
From: time.Now().Truncate(time.Minute).Add(time.Second*-1*time.Duration(s.metadata.metricCollectionTime)).UnixNano() / 1e6,
To: time.Now().Truncate(time.Minute).UnixNano() / 1e6,
Period: h.metadata.metricPeriod,
Filter: h.metadata.metricFilter,
Period: s.metadata.metricPeriod,
Filter: s.metadata.metricFilter,
}

metricdatas, err := metricdata.BatchQuery(sc, opts).ExtractMetricDatas()
if err != nil {
cloudeyeLog.Error(err, "query metrics failed")
s.logger.Error(err, "query metrics failed")
if ue, ok := err.(*gophercloud.UnifiedError); ok {
cloudeyeLog.Info("ErrCode:", ue.ErrorCode())
cloudeyeLog.Info("Message:", ue.Message())
s.logger.Info("ErrCode:", ue.ErrorCode())
s.logger.Info("Message:", ue.Message())
}
return -1, err
}

cloudeyeLog.V(1).Info("Received Metric Data", "data", metricdatas)
s.logger.V(1).Info("Received Metric Data", "data", metricdatas)

var metricValue float64

if metricdatas[0].Datapoints != nil && len(metricdatas[0].Datapoints) > 0 {
v, ok := metricdatas[0].Datapoints[0][h.metadata.metricFilter].(float64)
v, ok := metricdatas[0].Datapoints[0][s.metadata.metricFilter].(float64)
if ok {
metricValue = v
} else {
Expand Down
8 changes: 5 additions & 3 deletions pkg/scalers/huawei_cloudeye_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package scalers
import (
"context"
"testing"

"github.com/go-logr/logr"
)

var (
Expand Down Expand Up @@ -158,7 +160,7 @@ var huaweiCloudeyeMetricIdentifiers = []huaweiCloudeyeMetricIdentifier{

func TestHuaweiCloudeyeParseMetadata(t *testing.T) {
for _, testData := range testHuaweiCloudeyeMetadata {
_, err := parseHuaweiCloudeyeMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
_, err := parseHuaweiCloudeyeMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard())
if err != nil && !testData.isError {
t.Errorf("%s: Expected success but got error %s", testData.comment, err)
}
Expand All @@ -170,11 +172,11 @@ func TestHuaweiCloudeyeParseMetadata(t *testing.T) {

func TestHuaweiCloudeyeGetMetricSpecForScaling(t *testing.T) {
for _, testData := range huaweiCloudeyeMetricIdentifiers {
meta, err := parseHuaweiCloudeyeMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex})
meta, err := parseHuaweiCloudeyeMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockHuaweiCloudeyeScaler := huaweiCloudeyeScaler{"", meta}
mockHuaweiCloudeyeScaler := huaweiCloudeyeScaler{"", meta, logr.Discard()}

metricSpec := mockHuaweiCloudeyeScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"time"

"github.com/go-logr/logr"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand All @@ -30,6 +31,7 @@ type IBMMQScaler struct {
metricType v2beta2.MetricTargetType
metadata *IBMMQMetadata
defaultHTTPTimeout time.Duration
logger logr.Logger
}

// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewIBMMQScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
defaultHTTPTimeout: config.GlobalHTTPTimeout,
logger: InitializeLogger(config, "ibm_mq_scaler"),
}, nil
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"fmt"
"strconv"

"github.com/go-logr/logr"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
api "github.com/influxdata/influxdb-client-go/v2/api"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -20,6 +20,7 @@ type influxDBScaler struct {
client influxdb2.Client
metricType v2beta2.MetricTargetType
metadata *influxDBMetadata
logger logr.Logger
}

type influxDBMetadata struct {
Expand All @@ -33,21 +34,21 @@ type influxDBMetadata struct {
scalerIndex int
}

var influxDBLog = logf.Log.WithName("influxdb_scaler")

// NewInfluxDBScaler creates a new influx db scaler
func NewInfluxDBScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

logger := InitializeLogger(config, "influxdb_scaler")

meta, err := parseInfluxDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing influxdb metadata: %s", err)
}

influxDBLog.Info("starting up influxdb client")
logger.Info("starting up influxdb client")
client := influxdb2.NewClientWithOptions(
meta.serverURL,
meta.authToken,
Expand All @@ -57,6 +58,7 @@ func NewInfluxDBScaler(config *ScalerConfig) (Scaler, error) {
client: client,
metricType: metricType,
metadata: meta,
logger: logger,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/influxdb_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/go-logr/logr"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)

Expand Down Expand Up @@ -72,7 +73,7 @@ func TestInfluxDBGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockInfluxDBScaler := influxDBScaler{influxdb2.NewClient("https://influxdata.com", "myToken"), "", meta}
mockInfluxDBScaler := influxDBScaler{influxdb2.NewClient("https://influxdata.com", "myToken"), "", meta, logr.Discard()}

metricSpec := mockInfluxDBScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
Expand Down
Loading

0 comments on commit 62739d6

Please sign in to comment.