From 1ea0e77d5ee1d279d83646dec8f69379ff285776 Mon Sep 17 00:00:00 2001 From: adel121 Date: Fri, 27 Oct 2023 16:23:06 +0200 Subject: [PATCH 1/6] add rate_limit_queries_remaining_min telemetry in cluster agent external metrics server --- .../kubernetes/autoscalers/datadogexternal.go | 17 +++++++ .../autoscalers/datadogexternal_util.go | 46 +++++++++++++++++++ .../autoscalers/datadogexternal_util_test.go | 39 ++++++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 pkg/util/kubernetes/autoscalers/datadogexternal_util.go create mode 100644 pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal.go b/pkg/util/kubernetes/autoscalers/datadogexternal.go index d2c313dc01143..a22e456944866 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal.go @@ -18,6 +18,7 @@ import ( "gopkg.in/zorkian/go-datadog-api.v2" utilserror "k8s.io/apimachinery/pkg/util/errors" + "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/telemetry" le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -36,6 +37,9 @@ var ( rateLimitsRemaining = telemetry.NewGaugeWithOpts("", "rate_limit_queries_remaining", []string{"endpoint", le.JoinLeaderLabel}, "number of queries remaining before next reset", telemetry.Options{NoDoubleUnderscoreSep: true}) + rateLimitsRemainingMin = telemetry.NewGaugeWithOpts("", "rate_limit_queries_remaining_min", + []string{"endpoint", le.JoinLeaderLabel}, "minimum number of queries remaining before next reset observed during an expiration interval of 2*refresh period", + telemetry.Options{NoDoubleUnderscoreSep: true}) rateLimitsReset = telemetry.NewGaugeWithOpts("", "rate_limit_queries_reset", []string{"endpoint", le.JoinLeaderLabel}, "number of seconds before next reset", telemetry.Options{NoDoubleUnderscoreSep: true}) @@ -61,6 +65,7 @@ const ( queryEndpoint = "/api/v1/query" ) +<<<<<<< HEAD // isRateLimitError is a helper function that checks if the received error is a rate limit error func isRateLimitError(err error) bool { if err == nil { @@ -68,6 +73,13 @@ func isRateLimitError(err error) bool { } return strings.Contains(err.Error(), "429 Too Many Requests") } +======= +var ( + refreshPeriod = config.Datadog.GetInt("external_metrics_provider.refresh_period") + expiryDuration = 2 * refreshPeriod + mrr = newMinRemainingRequests(time.Duration(expiryDuration)) +) +>>>>>>> 9005f6e178 (add rate_limit_queries_remaining_min telemetry in cluster agent external metrics server) // queryDatadogExternal converts the metric name and labels from the Ref format into a Datadog metric. // It returns the last value for a bucket of 5 minutes, @@ -171,6 +183,11 @@ func (p *Processor) queryDatadogExternal(ddQueries []string, timeWindow time.Dur } } + // Update rateLimitsRemainingMin metric + updateMap := p.datadogClient.GetRateLimitStats() + queryLimits := updateMap[queryEndpoint] + mrr.update(queryLimits.Remaining) + return processedMetrics, nil } diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go new file mode 100644 index 0000000000000..698fe885d0b24 --- /dev/null +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go @@ -0,0 +1,46 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2017-present Datadog, Inc. + +//go:build kubeapiserver + +package autoscalers + +import ( + "strconv" + "time" + + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" +) + +type minRemainingRequests struct { + val int + timestamp time.Time + expiryDuration time.Duration +} + +func newMinRemainingRequests(expiryDuration time.Duration) minRemainingRequests { + return minRemainingRequests{ + val: -1, + timestamp: time.Now(), + expiryDuration: expiryDuration, + } +} + +func (mrr *minRemainingRequests) update(newVal string) { + newValFloat, err := strconv.Atoi(newVal) + + if err != nil { + return + } + + isSet := mrr.val >= 0 + hasExpired := time.Since(mrr.timestamp) > mrr.expiryDuration + + if mrr.val >= newValFloat || !isSet || hasExpired { + mrr.val = newValFloat + mrr.timestamp = time.Now() + rateLimitsRemainingMin.Set(float64(mrr.val), queryEndpoint, le.JoinLeaderValue) + } +} diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go new file mode 100644 index 0000000000000..c510fd948fd71 --- /dev/null +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go @@ -0,0 +1,39 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2017-present Datadog, Inc. + +//go:build kubeapiserver + +package autoscalers + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestIsSet(t *testing.T) { + expiryDuration := 60 * time.Second + mrr := newMinRemainingRequests(expiryDuration) + + // Should update + mrr.update("10") + assert.Equal(t, mrr.val, 10) + + // Should not update, since value didn't expire yet + mrr.update("11") + assert.Equal(t, mrr.val, 10) + + // Shoud update, even if value didn't expire because new value is lower + mrr.update("5") + assert.Equal(t, mrr.val, 5) + + // Change timestamp to simulate expiratio + mrr.timestamp = time.Now().Add(-2 * expiryDuration) + + // Shoud update because current value has expired + mrr.update("100") + assert.Equal(t, mrr.val, 100) +} From 59ba061621843f0289e9a24a59d54126f7e43908 Mon Sep 17 00:00:00 2001 From: adel121 Date: Mon, 30 Oct 2023 11:22:40 +0100 Subject: [PATCH 2/6] added release note --- ...ries_remaining_min_telemetry-233fcfdbc0fe3822.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 releasenotes-dca/notes/add-rate_limit_queries_remaining_min_telemetry-233fcfdbc0fe3822.yaml diff --git a/releasenotes-dca/notes/add-rate_limit_queries_remaining_min_telemetry-233fcfdbc0fe3822.yaml b/releasenotes-dca/notes/add-rate_limit_queries_remaining_min_telemetry-233fcfdbc0fe3822.yaml new file mode 100644 index 0000000000000..d76c197d2e94d --- /dev/null +++ b/releasenotes-dca/notes/add-rate_limit_queries_remaining_min_telemetry-233fcfdbc0fe3822.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG-DCA.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +features: + - | + Report `rate_limit_queries_remaining_min` telemetry from `external-metrics` server. From 85cf94bc79d23b26971e9dc7c854d035fcb1c8cc Mon Sep 17 00:00:00 2001 From: adel121 Date: Mon, 30 Oct 2023 14:34:07 +0100 Subject: [PATCH 3/6] add mutex lock to mrr struct --- pkg/util/kubernetes/autoscalers/datadogexternal.go | 14 ++++++-------- .../kubernetes/autoscalers/datadogexternal_util.go | 5 +++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal.go b/pkg/util/kubernetes/autoscalers/datadogexternal.go index a22e456944866..daeca0b44783c 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal.go @@ -65,7 +65,12 @@ const ( queryEndpoint = "/api/v1/query" ) -<<<<<<< HEAD +var ( + refreshPeriod = config.Datadog.GetInt("external_metrics_provider.refresh_period") + expiryDuration = 2 * refreshPeriod + mrr = newMinRemainingRequests(time.Duration(time.Duration(expiryDuration) * time.Second)) +) + // isRateLimitError is a helper function that checks if the received error is a rate limit error func isRateLimitError(err error) bool { if err == nil { @@ -73,13 +78,6 @@ func isRateLimitError(err error) bool { } return strings.Contains(err.Error(), "429 Too Many Requests") } -======= -var ( - refreshPeriod = config.Datadog.GetInt("external_metrics_provider.refresh_period") - expiryDuration = 2 * refreshPeriod - mrr = newMinRemainingRequests(time.Duration(expiryDuration)) -) ->>>>>>> 9005f6e178 (add rate_limit_queries_remaining_min telemetry in cluster agent external metrics server) // queryDatadogExternal converts the metric name and labels from the Ref format into a Datadog metric. // It returns the last value for a bucket of 5 minutes, diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go index 698fe885d0b24..b3f924919b2fb 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go @@ -9,12 +9,14 @@ package autoscalers import ( "strconv" + "sync" "time" le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" ) type minRemainingRequests struct { + sync.Mutex val int timestamp time.Time expiryDuration time.Duration @@ -29,6 +31,9 @@ func newMinRemainingRequests(expiryDuration time.Duration) minRemainingRequests } func (mrr *minRemainingRequests) update(newVal string) { + mrr.Lock() + defer mrr.Unlock() + newValFloat, err := strconv.Atoi(newVal) if err != nil { From 081c5bc9ba0c4b744c138225b0a2d0358fcb2f53 Mon Sep 17 00:00:00 2001 From: adel121 Date: Tue, 31 Oct 2023 10:10:57 +0100 Subject: [PATCH 4/6] improve unit test --- .../kubernetes/autoscalers/datadogexternal.go | 22 ++++++++++--- .../autoscalers/datadogexternal_util.go | 32 +++++++------------ .../autoscalers/datadogexternal_util_test.go | 22 +++++++++---- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal.go b/pkg/util/kubernetes/autoscalers/datadogexternal.go index daeca0b44783c..61bbe1993bf70 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal.go @@ -13,6 +13,7 @@ import ( "math" "strconv" "strings" + "sync" "time" "gopkg.in/zorkian/go-datadog-api.v2" @@ -66,11 +67,20 @@ const ( ) var ( - refreshPeriod = config.Datadog.GetInt("external_metrics_provider.refresh_period") - expiryDuration = 2 * refreshPeriod - mrr = newMinRemainingRequests(time.Duration(time.Duration(expiryDuration) * time.Second)) + minRemainingRequestsTracker *minTracker + once sync.Once ) +func getMinRemainingRequestsTracker() *minTracker { + once.Do(func() { + refreshPeriod := config.Datadog.GetInt("external_metrics_provider.refresh_period") + expiryDuration := 2 * refreshPeriod + minRemainingRequestsTracker = newMinTracker(time.Duration(time.Duration(expiryDuration) * time.Second)) + }) + + return minRemainingRequestsTracker +} + // isRateLimitError is a helper function that checks if the received error is a rate limit error func isRateLimitError(err error) bool { if err == nil { @@ -184,7 +194,11 @@ func (p *Processor) queryDatadogExternal(ddQueries []string, timeWindow time.Dur // Update rateLimitsRemainingMin metric updateMap := p.datadogClient.GetRateLimitStats() queryLimits := updateMap[queryEndpoint] - mrr.update(queryLimits.Remaining) + newVal, err := strconv.Atoi(queryLimits.Remaining) + if err == nil { + getMinRemainingRequestsTracker().update(newVal) + rateLimitsRemainingMin.Set(float64(minRemainingRequestsTracker.val), queryEndpoint, le.JoinLeaderLabel) + } return processedMetrics, nil } diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go index b3f924919b2fb..8aec3ec874862 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go @@ -8,44 +8,34 @@ package autoscalers import ( - "strconv" "sync" "time" - - le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" ) -type minRemainingRequests struct { +type minTracker struct { sync.Mutex val int timestamp time.Time expiryDuration time.Duration } -func newMinRemainingRequests(expiryDuration time.Duration) minRemainingRequests { - return minRemainingRequests{ +func newMinTracker(expiryDuration time.Duration) *minTracker { + return &minTracker{ val: -1, timestamp: time.Now(), expiryDuration: expiryDuration, } } -func (mrr *minRemainingRequests) update(newVal string) { - mrr.Lock() - defer mrr.Unlock() - - newValFloat, err := strconv.Atoi(newVal) - - if err != nil { - return - } +func (mt *minTracker) update(newVal int) { + mt.Lock() + defer mt.Unlock() - isSet := mrr.val >= 0 - hasExpired := time.Since(mrr.timestamp) > mrr.expiryDuration + isSet := mt.val >= 0 + hasExpired := time.Since(mt.timestamp) > mt.expiryDuration - if mrr.val >= newValFloat || !isSet || hasExpired { - mrr.val = newValFloat - mrr.timestamp = time.Now() - rateLimitsRemainingMin.Set(float64(mrr.val), queryEndpoint, le.JoinLeaderValue) + if newVal <= mt.val || !isSet || hasExpired { + mt.val = newVal + mt.timestamp = time.Now() } } diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go index c510fd948fd71..b4c9374067c95 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go @@ -14,26 +14,34 @@ import ( "github.com/stretchr/testify/assert" ) -func TestIsSet(t *testing.T) { +func TestUpdateMinimumRemainingRequests(t *testing.T) { expiryDuration := 60 * time.Second - mrr := newMinRemainingRequests(expiryDuration) + + mrr := newMinTracker(expiryDuration) // Should update - mrr.update("10") + mrr.update(10) assert.Equal(t, mrr.val, 10) // Should not update, since value didn't expire yet - mrr.update("11") + mrr.update(11) + assert.Equal(t, mrr.val, 10) + + // simulate waiting half the expirationDuration + mrr.timestamp = time.Now().Add(-expiryDuration / 2) + + // Should not update + mrr.update(199) assert.Equal(t, mrr.val, 10) // Shoud update, even if value didn't expire because new value is lower - mrr.update("5") + mrr.update(5) assert.Equal(t, mrr.val, 5) - // Change timestamp to simulate expiratio + // Change timestamp to simulate expiration mrr.timestamp = time.Now().Add(-2 * expiryDuration) // Shoud update because current value has expired - mrr.update("100") + mrr.update(100) assert.Equal(t, mrr.val, 100) } From 62f527606b0e78c413fe2ed6c917afbc0a4ed15b Mon Sep 17 00:00:00 2001 From: adel121 Date: Tue, 7 Nov 2023 14:37:16 +0100 Subject: [PATCH 5/6] add a get() method to the minTracker struct --- .../kubernetes/autoscalers/datadogexternal.go | 2 +- .../autoscalers/datadogexternal_util.go | 4 +++ .../autoscalers/datadogexternal_util_test.go | 28 +++++++++---------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal.go b/pkg/util/kubernetes/autoscalers/datadogexternal.go index 61bbe1993bf70..c00e8d59ad79e 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal.go @@ -197,7 +197,7 @@ func (p *Processor) queryDatadogExternal(ddQueries []string, timeWindow time.Dur newVal, err := strconv.Atoi(queryLimits.Remaining) if err == nil { getMinRemainingRequestsTracker().update(newVal) - rateLimitsRemainingMin.Set(float64(minRemainingRequestsTracker.val), queryEndpoint, le.JoinLeaderLabel) + rateLimitsRemainingMin.Set(float64(minRemainingRequestsTracker.get()), queryEndpoint, le.JoinLeaderLabel) } return processedMetrics, nil diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go index 8aec3ec874862..fbfb06197d73c 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go @@ -39,3 +39,7 @@ func (mt *minTracker) update(newVal int) { mt.timestamp = time.Now() } } + +func (mt *minTracker) get() int { + return mt.val +} diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go index b4c9374067c95..e9781496bf39a 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util_test.go @@ -14,34 +14,34 @@ import ( "github.com/stretchr/testify/assert" ) -func TestUpdateMinimumRemainingRequests(t *testing.T) { +func TestUpdateMinTracker(t *testing.T) { expiryDuration := 60 * time.Second - mrr := newMinTracker(expiryDuration) + mt := newMinTracker(expiryDuration) // Should update - mrr.update(10) - assert.Equal(t, mrr.val, 10) + mt.update(10) + assert.Equal(t, mt.get(), 10) // Should not update, since value didn't expire yet - mrr.update(11) - assert.Equal(t, mrr.val, 10) + mt.update(11) + assert.Equal(t, mt.get(), 10) // simulate waiting half the expirationDuration - mrr.timestamp = time.Now().Add(-expiryDuration / 2) + mt.timestamp = time.Now().Add(-expiryDuration / 2) // Should not update - mrr.update(199) - assert.Equal(t, mrr.val, 10) + mt.update(199) + assert.Equal(t, mt.get(), 10) // Shoud update, even if value didn't expire because new value is lower - mrr.update(5) - assert.Equal(t, mrr.val, 5) + mt.update(5) + assert.Equal(t, mt.get(), 5) // Change timestamp to simulate expiration - mrr.timestamp = time.Now().Add(-2 * expiryDuration) + mt.timestamp = time.Now().Add(-2 * expiryDuration) // Shoud update because current value has expired - mrr.update(100) - assert.Equal(t, mrr.val, 100) + mt.update(100) + assert.Equal(t, mt.get(), 100) } From aec1e20794a9b73c470af5dad2f1e48cf04b3704 Mon Sep 17 00:00:00 2001 From: adel121 Date: Tue, 7 Nov 2023 16:58:51 +0100 Subject: [PATCH 6/6] add lock on the get method --- pkg/util/kubernetes/autoscalers/datadogexternal_util.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go index fbfb06197d73c..b888a386c6432 100644 --- a/pkg/util/kubernetes/autoscalers/datadogexternal_util.go +++ b/pkg/util/kubernetes/autoscalers/datadogexternal_util.go @@ -41,5 +41,7 @@ func (mt *minTracker) update(newVal int) { } func (mt *minTracker) get() int { + mt.Lock() + defer mt.Unlock() return mt.val }