Skip to content

Commit

Permalink
[Metricbeat][Azure] Azure monitor/storage: Fix 429 Too Many Requests …
Browse files Browse the repository at this point in the history
…error (#38294)

* Fiz azure too many requests error

Signed-off-by: constanca <[email protected]>

* Add CHANGELOG.next.asciidoc entry

Signed-off-by: constanca <[email protected]>

* Run mage check

Signed-off-by: constanca <[email protected]>

* Get metrics 1 time for each namespace/id

Signed-off-by: constanca <[email protected]>

* Add error comparison

Signed-off-by: constanca <[email protected]>

* Move retry to monitor service

Signed-off-by: constanca <[email protected]>

* Remove GetMetricDefinitions without retry

Signed-off-by: constanca <[email protected]>

* fix storage unit tests

Signed-off-by: constanca <[email protected]>

* fix monitor unit tests

Signed-off-by: constanca <[email protected]>

* Remove unnecessary for cycle

Signed-off-by: constanca <[email protected]>

---------

Signed-off-by: constanca <[email protected]>
  • Loading branch information
constanca-m authored Mar 20, 2024
1 parent 01f5a90 commit 10ff992
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ fields added to events containing the Beats version. {pull}37553[37553]

*Metricbeat*

- Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294]
- Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720]

*Osquerybeat*
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/azure/mock_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (client *MockService) GetResourceDefinitions(id []string, group []string, r
return args.Get(0).([]*armresources.GenericResourceExpanded), args.Error(1)
}

// GetMetricDefinitions is a mock function for the azure service
func (client *MockService) GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
// GetMetricDefinitionsWithRetry is a mock function for the azure service
func (client *MockService) GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
args := client.Called(resourceId, namespace)
return args.Get(0).(armmonitor.MetricDefinitionCollection), args.Error(1)
}
Expand Down
20 changes: 16 additions & 4 deletions x-pack/metricbeat/module/azure/monitor/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,24 @@ const missingNamespace = "no metric definitions were found for resource %s and n
// mapMetrics should validate and map the metric related configuration to relevant azure monitor api parameters
func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceExpanded, resourceConfig azure.ResourceConfig) ([]azure.Metric, error) {
var metrics []azure.Metric

for _, resource := range resources {

// We use this map to avoid calling the metrics definition function for the same namespace and same resource
// multiple times.
namespaceMetrics := make(map[string]armmonitor.MetricDefinitionCollection)

for _, metric := range resourceConfig.Metrics {
// get all metrics supported by the namespace provided
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitions(*resource.ID, metric.Namespace)
if err != nil {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", *resource.ID, metric.Namespace, err)

var err error

metricDefinitions, exists := namespaceMetrics[metric.Namespace]
if !exists {
metricDefinitions, err = client.AzureMonitorService.GetMetricDefinitionsWithRetry(*resource.ID, metric.Namespace)
if err != nil {
return nil, err
}
namespaceMetrics[metric.Namespace] = metricDefinitions
}

if len(metricDefinitions.Value) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/azure/monitor/client_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestMapMetric(t *testing.T) {
client := azure.NewMockClient()
t.Run("return error when no metric definitions were found", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(armmonitor.MetricDefinitionCollection{}, fmt.Errorf("invalid resource ID"))
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(armmonitor.MetricDefinitionCollection{}, fmt.Errorf("invalid resource ID"))
client.AzureMonitorService = m
metric, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig)
assert.Error(t, err)
Expand All @@ -97,7 +97,7 @@ func TestMapMetric(t *testing.T) {
})
t.Run("return all metrics when all metric names and aggregations were configured", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
client.AzureMonitorService = m
metricConfig.Name = []string{"*"}
resourceConfig.Metrics = []azure.MetricConfig{metricConfig}
Expand All @@ -112,7 +112,7 @@ func TestMapMetric(t *testing.T) {
})
t.Run("return all metrics when specific metric names and aggregations were configured", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
client.AzureMonitorService = m
metricConfig.Name = []string{"TotalRequests", "Capacity"}
metricConfig.Aggregations = []string{"Average"}
Expand Down
51 changes: 47 additions & 4 deletions x-pack/metricbeat/module/azure/monitor_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ package azure

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"

"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -195,8 +200,43 @@ func (service *MonitorService) GetMetricNamespaces(resourceId string) (armmonito
return metricNamespaceCollection, nil
}

// GetMetricDefinitions will return all supported metrics based on the resource id and namespace
func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
// sleepIfPossible will check for the error 429 in the azure response, and look for the retry after header.
// If the header is present, then metricbeat will sleep for that duration, otherwise it will return an error.
func (service *MonitorService) sleepIfPossible(err error, resourceId string, namespace string) error {
errorMsg := "no metric definitions were found for resource " + resourceId + " and namespace " + namespace

var respError *azcore.ResponseError
ok := errors.As(err, &respError)
if !ok {
return fmt.Errorf("%s, failed to cast error to azcore.ResponseError", errorMsg)
}
// Check for TooManyRequests error and retry if it is the case
if respError.StatusCode != http.StatusTooManyRequests {
return fmt.Errorf("%s, %w", errorMsg, err)
}

// Check if the error has the header Retry After.
// If it is present, then we should try to make this request again.
retryAfter := respError.RawResponse.Header.Get("Retry-After")
if retryAfter == "" {
return fmt.Errorf("%s %w, failed to find Retry-After header", errorMsg, err)
}

duration, errD := time.ParseDuration(retryAfter + "s")
if errD != nil {
return fmt.Errorf("%s, failed to parse duration %s from header retry after", errorMsg, retryAfter)
}

service.log.Infof("%s, metricbeat will try again after %s seconds", errorMsg, retryAfter)
time.Sleep(duration)
service.log.Infof("%s, metricbeat finished sleeping and will try again now", errorMsg)

return nil
}

// GetMetricDefinitionsWithRetry will return all supported metrics based on the resource id and namespace
// It will check for an error when moving the pager to the next page, and retry if possible.
func (service *MonitorService) GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
opts := &armmonitor.MetricDefinitionsClientListOptions{}

if namespace != "" {
Expand All @@ -210,9 +250,12 @@ func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace
for pager.More() {
nextPage, err := pager.NextPage(service.context)
if err != nil {
return armmonitor.MetricDefinitionCollection{}, err
retryError := service.sleepIfPossible(err, resourceId, namespace)
if retryError != nil {
return armmonitor.MetricDefinitionCollection{}, err
}
continue
}

metricDefinitionCollection.Value = append(metricDefinitionCollection.Value, nextPage.Value...)
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/azure/service_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Service interface {
GetResourceDefinitionById(id string) (armresources.GenericResource, error)
GetResourceDefinitions(id []string, group []string, rType string, query string) ([]*armresources.GenericResourceExpanded, error)
GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error)
GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error)
GetMetricNamespaces(resourceId string) (armmonitor.MetricNamespaceCollection, error)
GetMetricValues(resourceId string, namespace string, timegrain string, timespan string, metricNames []string, aggregations string, filter string) ([]armmonitor.Metric, string, error)
}
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/azure/storage/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE
}

// get all metric definitions supported by the namespace provided
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitions(resourceID, namespace)
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitionsWithRetry(resourceID, namespace)
if err != nil {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", resourceID, namespace, err)
return nil, err
}

if len(metricDefinitions.Value) == 0 {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", resourceID, namespace, err)
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s", resourceID, namespace)
}

var filteredMetricDefinitions []armmonitor.MetricDefinition
Expand Down
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/azure/storage/client_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ func TestMapMetric(t *testing.T) {
client := azure.NewMockClient()
t.Run("return error when no metric definitions were found", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(emptyMetricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(emptyMetricDefinitions, nil)
client.AzureMonitorService = m
metric, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig)
assert.Error(t, err)
assert.Equal(t, err.Error(), "no metric definitions were found for resource 123 and namespace Microsoft.Storage/storageAccounts %!w(<nil>)")
assert.Equal(t, err.Error(), "no metric definitions were found for resource 123 and namespace Microsoft.Storage/storageAccounts")
assert.Equal(t, metric, []azure.Metric(nil))
m.AssertExpectations(t)
})
t.Run("return mapped metrics correctly", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
client.AzureMonitorService = m
metrics, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig)
assert.NoError(t, err)
Expand Down

0 comments on commit 10ff992

Please sign in to comment.