Skip to content

Commit

Permalink
feat: Provide support for batch scraping (#2459)
Browse files Browse the repository at this point in the history
Co-authored-by: Tom Kerkhove <[email protected]>
  • Loading branch information
hkfgo and tomkerkhove authored Oct 7, 2024
1 parent 880d10e commit 5fa547b
Show file tree
Hide file tree
Showing 42 changed files with 1,488 additions and 144 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/templates-build-push-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ jobs:
context: ./src/
file: ./src/${{ inputs.project_name }}/Dockerfile.linux
tags: ${{ env.image_commit_uri }},${{ env.image_latest_uri }}
push: true
push: true
58 changes: 48 additions & 10 deletions src/Promitor.Agents.Scraper/Scheduling/ResourcesScrapingJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Promitor.Core.Extensions;
using Promitor.Core.Metrics.Interfaces;
using Promitor.Core.Metrics.Sinks;
using Promitor.Core.Scraping.Batching;
using Promitor.Core.Scraping.Configuration.Model;
using Promitor.Core.Scraping.Configuration.Model.Metrics;
using Promitor.Core.Scraping.Factories;
Expand Down Expand Up @@ -134,7 +135,6 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
try
{
var scrapeDefinitions = await GetAllScrapeDefinitions(cancellationToken);

await ScrapeMetrics(scrapeDefinitions, cancellationToken);
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -251,22 +251,59 @@ private void GetResourceScrapeDefinition(IAzureResourceDefinition resourceDefini
}

private async Task ScrapeMetrics(IEnumerable<ScrapeDefinition<IAzureResourceDefinition>> scrapeDefinitions, CancellationToken cancellationToken)
{
{
var tasks = new List<Task>();
var batchScrapingEnabled = this._azureMonitorIntegrationConfiguration.Value.MetricsBatching?.Enabled ?? false;
if (batchScrapingEnabled) {
Logger.LogInformation("Promitor Scraper with operate in batch scraping mode, with max batch size {BatchSize}", this._azureMonitorIntegrationConfiguration.Value.MetricsBatching.MaxBatchSize);
Logger.LogWarning("Batch scraping is an experimental feature. See Promitor.io for its limitations and cost considerations");

var batchScrapeDefinitions = AzureResourceDefinitionBatching.GroupScrapeDefinitions(scrapeDefinitions, this._azureMonitorIntegrationConfiguration.Value.MetricsBatching.MaxBatchSize);

foreach(var batchScrapeDefinition in batchScrapeDefinitions) {
var azureMetricName = batchScrapeDefinition.ScrapeDefinitionBatchProperties.AzureMetricConfiguration.MetricName;
var resourceType = batchScrapeDefinition.ScrapeDefinitionBatchProperties.ResourceType;
Logger.LogInformation("Executing batch scrape job of size {BatchSize} for Azure Metric {AzureMetricName} for resource type {ResourceType}.", batchScrapeDefinition.ScrapeDefinitions.Count, azureMetricName, resourceType);
await ScheduleLimitedConcurrencyAsyncTask(tasks, () => ScrapeMetricBatched(batchScrapeDefinition), cancellationToken);
}
} else {
foreach (var scrapeDefinition in scrapeDefinitions)
{
cancellationToken.ThrowIfCancellationRequested();

foreach (var scrapeDefinition in scrapeDefinitions)
{
cancellationToken.ThrowIfCancellationRequested();

var metricName = scrapeDefinition.PrometheusMetricDefinition.Name;
var resourceType = scrapeDefinition.Resource.ResourceType;
Logger.LogInformation("Scraping {MetricName} for resource type {ResourceType}.", metricName, resourceType);
var metricName = scrapeDefinition.PrometheusMetricDefinition.Name;
var resourceType = scrapeDefinition.Resource.ResourceType;
Logger.LogInformation("Scraping {MetricName} for resource type {ResourceType}.", metricName, resourceType);

await ScheduleLimitedConcurrencyAsyncTask(tasks, () => ScrapeMetric(scrapeDefinition), cancellationToken);
await ScheduleLimitedConcurrencyAsyncTask(tasks, () => ScrapeMetric(scrapeDefinition), cancellationToken);
}
}

await Task.WhenAll(tasks);
}
private async Task ScrapeMetricBatched(BatchScrapeDefinition<IAzureResourceDefinition> batchScrapeDefinition) {
try
{
var resourceSubscriptionId = batchScrapeDefinition.ScrapeDefinitionBatchProperties.SubscriptionId;
var azureMonitorClient = _azureMonitorClientFactory.CreateIfNotExists(_metricsDeclaration.AzureMetadata.Cloud, _metricsDeclaration.AzureMetadata.TenantId,
resourceSubscriptionId, _metricSinkWriter, _azureScrapingSystemMetricsPublisher, _resourceMetricDefinitionMemoryCache, _configuration,
_azureMonitorIntegrationConfiguration, _azureMonitorLoggingConfiguration, _loggerFactory);
var azureEnvironent = _metricsDeclaration.AzureMetadata.Cloud.GetAzureEnvironment();

var tokenCredential = AzureAuthenticationFactory.GetTokenCredential(azureEnvironent.ManagementEndpoint, _metricsDeclaration.AzureMetadata.TenantId,
AzureAuthenticationFactory.GetConfiguredAzureAuthentication(_configuration), new Uri(_metricsDeclaration.AzureMetadata.Cloud.GetAzureEnvironment().AuthenticationEndpoint));
var logAnalyticsClient = new LogAnalyticsClient(_loggerFactory, azureEnvironent, tokenCredential);

var scraper = _metricScraperFactory.CreateScraper(batchScrapeDefinition.ScrapeDefinitionBatchProperties.ResourceType, _metricSinkWriter, _azureScrapingSystemMetricsPublisher, azureMonitorClient, logAnalyticsClient);

await scraper.BatchScrapeAsync(batchScrapeDefinition);
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to scrape metric {MetricName} for resource batch {ResourceName}. Details: {Details}",
batchScrapeDefinition.ScrapeDefinitionBatchProperties.PrometheusMetricDefinition.Name, batchScrapeDefinition.ScrapeDefinitionBatchProperties.ResourceType, ex.ToString());
}
}

private async Task ScrapeMetric(ScrapeDefinition<IAzureResourceDefinition> scrapeDefinition)
{
Expand All @@ -287,6 +324,7 @@ private async Task ScrapeMetric(ScrapeDefinition<IAzureResourceDefinition> scrap
var logAnalyticsClient = new LogAnalyticsClient(_loggerFactory, azureEnvironent, tokenCredential);

var scraper = _metricScraperFactory.CreateScraper(scrapeDefinition.Resource.ResourceType, _metricSinkWriter, _azureScrapingSystemMetricsPublisher, azureMonitorClient, logAnalyticsClient);

await scraper.ScrapeAsync(scrapeDefinition);
}
catch (Exception ex)
Expand Down
78 changes: 75 additions & 3 deletions src/Promitor.Core.Scraping/AzureMonitorScraper.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using GuardNet;
using Microsoft.Extensions.Logging;
using Promitor.Core.Contracts;
using Promitor.Core.Extensions;
using Promitor.Core.Metrics;
using Promitor.Core.Scraping.Configuration.Model;
using Promitor.Core.Scraping.Configuration.Model.Metrics;
Expand All @@ -18,13 +21,19 @@ namespace Promitor.Core.Scraping
/// <typeparam name="TResourceDefinition">Type of metric definition that is being used</typeparam>
public abstract class AzureMonitorScraper<TResourceDefinition> : Scraper<TResourceDefinition>
where TResourceDefinition : class, IAzureResourceDefinition
{
{
/// <summary>
/// A cache to store resource definitions. Used to hydrate resource info from resource ID, when processing batch query results
/// </summary>
private readonly ConcurrentDictionary<string, Tuple<IAzureResourceDefinition, TResourceDefinition>> _resourceDefinitions; // using a dictionary for now since IMemoryCache involves layers of injection

/// <summary>
/// Constructor
/// </summary>
protected AzureMonitorScraper(ScraperConfiguration scraperConfiguration) :
base(scraperConfiguration)
{
_resourceDefinitions = new ConcurrentDictionary<string, Tuple<IAzureResourceDefinition, TResourceDefinition>>();
}

/// <inheritdoc />
Expand Down Expand Up @@ -73,6 +82,69 @@ protected override async Task<ScrapeResult> ScrapeResourceAsync(string subscript
return new ScrapeResult(subscriptionId, scrapeDefinition.ResourceGroupName, resourceDefinition.ResourceName, resourceUri, finalMetricValues, metricLabels);
}

protected override async Task<List<ScrapeResult>> BatchScrapeResourceAsync(string subscriptionId, BatchScrapeDefinition<IAzureResourceDefinition> batchScrapeDefinition, PromitorMetricAggregationType aggregationType, TimeSpan aggregationInterval)
{
Guard.NotNull(batchScrapeDefinition, nameof(batchScrapeDefinition));
Guard.NotLessThan(batchScrapeDefinition.ScrapeDefinitions.Count(), 1, nameof(batchScrapeDefinition));
Guard.NotNull(batchScrapeDefinition.ScrapeDefinitionBatchProperties.AzureMetricConfiguration, nameof(batchScrapeDefinition.ScrapeDefinitionBatchProperties.AzureMetricConfiguration));

var metricName = batchScrapeDefinition.ScrapeDefinitionBatchProperties.AzureMetricConfiguration.MetricName;

// Build list of resource URIs based on definitions in the batch
var resourceUriList = new List<string>();
foreach (ScrapeDefinition<IAzureResourceDefinition> scrapeDefinition in batchScrapeDefinition.ScrapeDefinitions)
{
var resourceUri = $"/{BuildResourceUri(subscriptionId, scrapeDefinition, (TResourceDefinition) scrapeDefinition.Resource)}";
resourceUriList.Add(resourceUri);
// cache resource info
// the TResourceDefinition resource definition attached to scrape definition can sometimes missing some attributes, need to them in here
var resourceDefinitionToCache = new AzureResourceDefinition
(
resourceType: scrapeDefinition.Resource.ResourceType,
resourceGroupName: scrapeDefinition.ResourceGroupName,
subscriptionId: scrapeDefinition.SubscriptionId,
resourceName: scrapeDefinition.Resource.ResourceName
);
_resourceDefinitions.AddOrUpdate(resourceUri, new Tuple<IAzureResourceDefinition, TResourceDefinition>(resourceDefinitionToCache, (TResourceDefinition)scrapeDefinition.Resource), (newTuple, oldTuple) => oldTuple);
}

var metricLimit = batchScrapeDefinition.ScrapeDefinitionBatchProperties.AzureMetricConfiguration.Limit;
var dimensionNames = DetermineMetricDimensions(metricName, (TResourceDefinition) batchScrapeDefinition.ScrapeDefinitions[0].Resource, batchScrapeDefinition.ScrapeDefinitionBatchProperties.AzureMetricConfiguration); // TODO: resource definition doesn't seem to be used, can we remove it from function signature?

var resourceIdTaggedMeasuredMetrics = new List<ResourceAssociatedMeasuredMetric>();
try
{
// Query Azure Monitor for metrics
resourceIdTaggedMeasuredMetrics = await AzureMonitorClient.BatchQueryMetricAsync(metricName, dimensionNames, aggregationType, aggregationInterval, resourceUriList, null, metricLimit);
}
catch (MetricInformationNotFoundException metricsNotFoundException)
{
Logger.LogWarning("No metric information found for metric {MetricName} with dimensions {MetricDimensions}. Details: {Details}", metricsNotFoundException.Name, metricsNotFoundException.Dimensions, metricsNotFoundException.Details);

var measuredMetric = dimensionNames.Count > 0
? MeasuredMetric.CreateForDimensions(dimensionNames)
: MeasuredMetric.CreateWithoutDimensions(null);
resourceIdTaggedMeasuredMetrics.Add(measuredMetric.WithResourceIdAssociation(null));
}

var scrapeResults = new List<ScrapeResult>();
// group based on resource, then do enrichment per group
var groupedMeasuredMetrics = resourceIdTaggedMeasuredMetrics.GroupBy(measuredMetric => measuredMetric.ResourceId);
foreach (IGrouping<string, ResourceAssociatedMeasuredMetric> resourceMetricsGroup in groupedMeasuredMetrics)
{
var resourceId = resourceMetricsGroup.Key;
if (_resourceDefinitions.TryGetValue(resourceId, out Tuple<IAzureResourceDefinition, TResourceDefinition> resourceDefinitionTuple))
{
var resourceDefinition = resourceDefinitionTuple.Item1;
var metricLabels = DetermineMetricLabels(resourceDefinitionTuple.Item2);
var finalMetricValues = EnrichMeasuredMetrics(resourceDefinitionTuple.Item2, dimensionNames, resourceMetricsGroup.ToImmutableList());
scrapeResults.Add(new ScrapeResult(subscriptionId, resourceDefinition.ResourceGroupName, resourceDefinition.ResourceName, resourceId, finalMetricValues, metricLabels));
}
}

return scrapeResults;
}

private int? DetermineMetricLimit(ScrapeDefinition<IAzureResourceDefinition> scrapeDefinition)
{
return scrapeDefinition.AzureMetricConfiguration.Limit;
Expand All @@ -89,9 +161,9 @@ protected override async Task<ScrapeResult> ScrapeResourceAsync(string subscript
/// <param name="dimensionNames">List of names of the specified dimensions provided by the scraper.</param>
/// <param name="metricValues">Measured metric values that were found</param>
/// <returns></returns>
protected virtual List<MeasuredMetric> EnrichMeasuredMetrics(TResourceDefinition resourceDefinition, List<string> dimensionNames, List<MeasuredMetric> metricValues)
protected virtual List<MeasuredMetric> EnrichMeasuredMetrics(TResourceDefinition resourceDefinition, List<string> dimensionNames, IReadOnlyList<MeasuredMetric> metricValues)
{
return metricValues;
return metricValues.ToList();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Collections.Generic;
using System.Linq;
using GuardNet;
using Promitor.Core.Contracts;
using Promitor.Core.Scraping.Configuration.Model.Metrics;

namespace Promitor.Core.Scraping.Batching
{
public static class AzureResourceDefinitionBatching
{
/// <summary>
/// groups scrape definitions based on following conditions:
/// 1. Definitions in a batch must target the same resource type
/// 2. Definitions in a batch must target the same Azure metric with identical dimensions
/// 3. Definitions in a batch must have the same time granularity
/// 4. Batch size cannot exceed configured maximum
/// <see href="https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/migrate-to-batch-api?tabs=individual-response#batching-restrictions"/>
/// </summary>
public static List<BatchScrapeDefinition<IAzureResourceDefinition>> GroupScrapeDefinitions(IEnumerable<ScrapeDefinition<IAzureResourceDefinition>> allScrapeDefinitions, int maxBatchSize)
{
// ReSharper disable PossibleMultipleEnumeration
Guard.NotNull(allScrapeDefinitions, nameof(allScrapeDefinitions));

return allScrapeDefinitions.GroupBy(def => def.BuildScrapingBatchInfo())
.ToDictionary(group => group.Key, group => group.ToList()) // first pass to build batches that could exceed max
.ToDictionary(group => group.Key, group => SplitScrapeDefinitionBatch(group.Value, maxBatchSize)) // split to right-sized batches
.SelectMany(group => group.Value.Select(batch => new BatchScrapeDefinition<IAzureResourceDefinition>(group.Key, batch)))
.ToList(); // flatten
}

/// <summary>
/// splits the "raw" batch according to max batch size configured
/// </summary>
private static List<List<ScrapeDefinition<IAzureResourceDefinition>>> SplitScrapeDefinitionBatch(List<ScrapeDefinition<IAzureResourceDefinition>> batchToSplit, int maxBatchSize)
{
// ReSharper disable PossibleMultipleEnumeration
Guard.NotNull(batchToSplit, nameof(batchToSplit));

int numNewGroups = ((batchToSplit.Count - 1) / maxBatchSize) + 1;

return Enumerable.Range(0, numNewGroups)
.Select(i => batchToSplit.Skip(i * maxBatchSize).Take(maxBatchSize).ToList())
.ToList();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Promitor.Core.Scraping.Configuration.Model
{
Expand Down Expand Up @@ -45,5 +46,29 @@ public class AzureMetricConfiguration
}
return Dimensions?.Any(dimension => dimension.Name.Equals(dimensionName, StringComparison.InvariantCultureIgnoreCase));
}

// A unique string to represent this Azure metric and its configured dimensions
public string ToUniqueStringRepresentation()
{
StringBuilder sb = new StringBuilder();
sb.Append(MetricName);
if (Dimension != null)
{
sb.Append('_');
sb.Append(Dimension.Name);
}
else if (Dimensions != null)
{
foreach (var dimension in Dimensions)
{
sb.Append('_');
sb.Append(dimension.Name);
}
}
sb.Append($"_limit{Limit}");


return sb.ToString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Collections.Generic;
using GuardNet;
using Promitor.Core.Contracts;

namespace Promitor.Core.Scraping.Configuration.Model.Metrics
{
/// <summary>
/// Defines a batch of ScrapeDefinitions to be executed in a single request
/// Scrape definitions within a batch should share
/// 1. The same resource type
/// 2. The same Azure metric scrape target with identical dimensions
/// 3. The same time granularity
/// 4. The same filters
/// </summary>
public class BatchScrapeDefinition<TResourceDefinition> where TResourceDefinition : class, IAzureResourceDefinition
{
/// <summary>
/// Creates a new instance of the <see cref="BatchScrapeDefinition{TResourceDefinition}"/> class.
/// </summary>
/// <param name="scrapeDefinitionBatchProperties">Shared Properties Among ScrapeDefinition's in the batch</param>
/// <param name="groupedScrapeDefinitions">Scape definitions in the batch</param>
public BatchScrapeDefinition(ScrapeDefinitionBatchProperties scrapeDefinitionBatchProperties, List<ScrapeDefinition<TResourceDefinition>> groupedScrapeDefinitions)
{
Guard.NotNull(groupedScrapeDefinitions, nameof(groupedScrapeDefinitions));
Guard.NotLessThan(groupedScrapeDefinitions.Count, 1, nameof(groupedScrapeDefinitions));
Guard.NotNull(scrapeDefinitionBatchProperties, nameof(scrapeDefinitionBatchProperties));

ScrapeDefinitionBatchProperties = scrapeDefinitionBatchProperties;
ScrapeDefinitions = groupedScrapeDefinitions;
}

/// <summary>
/// A batch of scrape job definitions to be executed as a single request
/// </summary>
public List<ScrapeDefinition<TResourceDefinition>> ScrapeDefinitions { get; set; }

public ScrapeDefinitionBatchProperties ScrapeDefinitionBatchProperties { get; set; }
}
}
Loading

0 comments on commit 5fa547b

Please sign in to comment.