Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat][Azure] Azure monitor/storage: Fix 429 Too Many Requests error #38294

Merged
merged 10 commits into from
Mar 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ fields added to events containing the Beats version. {pull}37553[37553]

*Metricbeat*

- Fix Azure Monitor 429 error by causing metricbeat to retry the request again. {pull}38294[38294]
- Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720]

*Osquerybeat*
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/azure/mock_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (client *MockService) GetResourceDefinitions(id []string, group []string, r
return args.Get(0).([]*armresources.GenericResourceExpanded), args.Error(1)
}

// GetMetricDefinitions is a mock function for the azure service
func (client *MockService) GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
// GetMetricDefinitionsWithRetry is a mock function for the azure service
func (client *MockService) GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
args := client.Called(resourceId, namespace)
return args.Get(0).(armmonitor.MetricDefinitionCollection), args.Error(1)
}
Expand Down
20 changes: 16 additions & 4 deletions x-pack/metricbeat/module/azure/monitor/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,24 @@ const missingNamespace = "no metric definitions were found for resource %s and n
// mapMetrics should validate and map the metric related configuration to relevant azure monitor api parameters
func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceExpanded, resourceConfig azure.ResourceConfig) ([]azure.Metric, error) {
var metrics []azure.Metric

for _, resource := range resources {

// We use this map to avoid calling the metrics definition function for the same namespace and same resource
// multiple times.
namespaceMetrics := make(map[string]armmonitor.MetricDefinitionCollection)

for _, metric := range resourceConfig.Metrics {
// get all metrics supported by the namespace provided
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitions(*resource.ID, metric.Namespace)
if err != nil {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", *resource.ID, metric.Namespace, err)

constanca-m marked this conversation as resolved.
Show resolved Hide resolved
var err error

metricDefinitions, exists := namespaceMetrics[metric.Namespace]
if !exists {
metricDefinitions, err = client.AzureMonitorService.GetMetricDefinitionsWithRetry(*resource.ID, metric.Namespace)
if err != nil {
return nil, err
}
namespaceMetrics[metric.Namespace] = metricDefinitions
}

if len(metricDefinitions.Value) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestMapMetric(t *testing.T) {
client := azure.NewMockClient()
t.Run("return error when no metric definitions were found", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(armmonitor.MetricDefinitionCollection{}, fmt.Errorf("invalid resource ID"))
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(armmonitor.MetricDefinitionCollection{}, fmt.Errorf("invalid resource ID"))
client.AzureMonitorService = m
metric, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig)
assert.Error(t, err)
Expand All @@ -97,7 +97,7 @@ func TestMapMetric(t *testing.T) {
})
t.Run("return all metrics when all metric names and aggregations were configured", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
client.AzureMonitorService = m
metricConfig.Name = []string{"*"}
resourceConfig.Metrics = []azure.MetricConfig{metricConfig}
Expand All @@ -112,7 +112,7 @@ func TestMapMetric(t *testing.T) {
})
t.Run("return all metrics when specific metric names and aggregations were configured", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
client.AzureMonitorService = m
metricConfig.Name = []string{"TotalRequests", "Capacity"}
metricConfig.Aggregations = []string{"Average"}
Expand Down
51 changes: 47 additions & 4 deletions x-pack/metricbeat/module/azure/monitor_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ package azure

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"

"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -195,8 +200,43 @@ func (service *MonitorService) GetMetricNamespaces(resourceId string) (armmonito
return metricNamespaceCollection, nil
}

// GetMetricDefinitions will return all supported metrics based on the resource id and namespace
func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
// sleepIfPossible will check for the error 429 in the azure response, and look for the retry after header.
// If the header is present, then metricbeat will sleep for that duration, otherwise it will return an error.
func (service *MonitorService) sleepIfPossible(err error, resourceId string, namespace string) error {
errorMsg := "no metric definitions were found for resource " + resourceId + " and namespace " + namespace

var respError *azcore.ResponseError
ok := errors.As(err, &respError)
if !ok {
return fmt.Errorf("%s, failed to cast error to azcore.ResponseError", errorMsg)
}
// Check for TooManyRequests error and retry if it is the case
if respError.StatusCode != http.StatusTooManyRequests {
return fmt.Errorf("%s, %w", errorMsg, err)
}

// Check if the error has the header Retry After.
// If it is present, then we should try to make this request again.
retryAfter := respError.RawResponse.Header.Get("Retry-After")
if retryAfter == "" {
return fmt.Errorf("%s %w, failed to find Retry-After header", errorMsg, err)
}

duration, errD := time.ParseDuration(retryAfter + "s")
if errD != nil {
return fmt.Errorf("%s, failed to parse duration %s from header retry after", errorMsg, retryAfter)
}

service.log.Infof("%s, metricbeat will try again after %s seconds", errorMsg, retryAfter)
time.Sleep(duration)
service.log.Infof("%s, metricbeat finished sleeping and will try again now", errorMsg)

return nil
}

// GetMetricDefinitionsWithRetry will return all supported metrics based on the resource id and namespace
// It will check for an error when moving the pager to the next page, and retry if possible.
func (service *MonitorService) GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error) {
opts := &armmonitor.MetricDefinitionsClientListOptions{}

if namespace != "" {
Expand All @@ -210,9 +250,12 @@ func (service *MonitorService) GetMetricDefinitions(resourceId string, namespace
for pager.More() {
nextPage, err := pager.NextPage(service.context)
if err != nil {
return armmonitor.MetricDefinitionCollection{}, err
retryError := service.sleepIfPossible(err, resourceId, namespace)
if retryError != nil {
return armmonitor.MetricDefinitionCollection{}, err
}
continue
}

metricDefinitionCollection.Value = append(metricDefinitionCollection.Value, nextPage.Value...)
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/azure/service_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type Service interface {
GetResourceDefinitionById(id string) (armresources.GenericResource, error)
GetResourceDefinitions(id []string, group []string, rType string, query string) ([]*armresources.GenericResourceExpanded, error)
GetMetricDefinitions(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error)
GetMetricDefinitionsWithRetry(resourceId string, namespace string) (armmonitor.MetricDefinitionCollection, error)
GetMetricNamespaces(resourceId string) (armmonitor.MetricNamespaceCollection, error)
GetMetricValues(resourceId string, namespace string, timegrain string, timespan string, metricNames []string, aggregations string, filter string) ([]armmonitor.Metric, string, error)
}
6 changes: 3 additions & 3 deletions x-pack/metricbeat/module/azure/storage/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE
}

// get all metric definitions supported by the namespace provided
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitions(resourceID, namespace)
metricDefinitions, err := client.AzureMonitorService.GetMetricDefinitionsWithRetry(resourceID, namespace)
if err != nil {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", resourceID, namespace, err)
return nil, err
}

if len(metricDefinitions.Value) == 0 {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", resourceID, namespace, err)
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s", resourceID, namespace)
}

var filteredMetricDefinitions []armmonitor.MetricDefinition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ func TestMapMetric(t *testing.T) {
client := azure.NewMockClient()
t.Run("return error when no metric definitions were found", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(emptyMetricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(emptyMetricDefinitions, nil)
client.AzureMonitorService = m
metric, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig)
assert.Error(t, err)
assert.Equal(t, err.Error(), "no metric definitions were found for resource 123 and namespace Microsoft.Storage/storageAccounts %!w(<nil>)")
assert.Equal(t, err.Error(), "no metric definitions were found for resource 123 and namespace Microsoft.Storage/storageAccounts")
assert.Equal(t, metric, []azure.Metric(nil))
m.AssertExpectations(t)
})
t.Run("return mapped metrics correctly", func(t *testing.T) {
m := &azure.MockService{}
m.On("GetMetricDefinitions", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
m.On("GetMetricDefinitionsWithRetry", mock.Anything, mock.Anything).Return(metricDefinitions, nil)
client.AzureMonitorService = m
metrics, err := mapMetrics(client, []*armresources.GenericResourceExpanded{resource}, resourceConfig)
assert.NoError(t, err)
Expand Down
Loading