Skip to content

Commit

Permalink
[CosmosDb] Scaler APIs (#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrod authored Jun 10, 2023
1 parent 0af7d17 commit 2ca4105
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.CosmosDB;
using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -63,5 +65,31 @@ public static IWebJobsBuilder AddCosmosDB(this IWebJobsBuilder builder, Action<C

return builder;
}

/// <summary>
/// Adds the Storage Queues extension to the provided <see cref="IWebJobsBuilder"/>.
/// </summary>
/// <param name="builder"></param>
/// <param name="triggerMetadata">Trigger metadata.</param>
/// <returns></returns>
public static IWebJobsBuilder AddCosmosDbScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
IServiceProvider serviceProvider = null;
Lazy<CosmosDbScalerProvider> scalerProvider = new Lazy<CosmosDbScalerProvider>(() => new CosmosDbScalerProvider(serviceProvider, triggerMetadata));

builder.Services.AddSingleton<IScaleMonitorProvider>(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});

builder.Services.AddSingleton<ITargetScalerProvider>(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});

return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ internal class CosmosDBTargetScaler : ITargetScaler
private readonly TargetScalerDescriptor _targetScalerDescriptor;
private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider;
private readonly ILogger _logger;
private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute;
private readonly int _maxItemsPerInvocation;
private readonly Container _monitoredContainer;

public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger)
public CosmosDBTargetScaler(string functionId, int maxItemsPerInvocation, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger)
{
_functionId = functionId;
_targetScalerDescriptor = new TargetScalerDescriptor(functionId);
_monitoredContainer = monitoredContainer;
_cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, _monitoredContainer, leaseContainer, processorName);
_logger = logger;
_cosmosDBTriggerAttribute = cosmosDBTriggerAttribute;
_maxItemsPerInvocation = maxItemsPerInvocation;
}

public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor;
Expand All @@ -44,7 +44,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context,

if (!context.InstanceConcurrency.HasValue)
{
concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0 ? _cosmosDBTriggerAttribute.MaxItemsPerInvocation : DefaultMaxItemsPerInvocation;
concurrency = _maxItemsPerInvocation > 0 ? _maxItemsPerInvocation : DefaultMaxItemsPerInvocation;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public CosmosDBTriggerListener(
$"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'";

this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName);
this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute, _monitoredContainer, _leaseContainer, _processorName, _logger);
this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute.MaxItemsPerInvocation, _monitoredContainer, _leaseContainer, _processorName, _logger);
}

public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor;
Expand Down
92 changes: 92 additions & 0 deletions src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDbScalerProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using static Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger.CosmosDbScalerProvider;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger
{
internal class CosmosDbScalerProvider : IScaleMonitorProvider, ITargetScalerProvider
{
private readonly CosmosDBScaleMonitor _scaleMonitor;
private readonly CosmosDBTargetScaler _targetScaler;

public CosmosDbScalerProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata)
{
AzureComponentFactory azureComponentFactory = null;
if (triggerMetadata.Properties != null && triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value))
{
azureComponentFactory = value as AzureComponentFactory;
}
else
{
azureComponentFactory = serviceProvider.GetService<AzureComponentFactory>();
}

ILoggerFactory loggerFactory = serviceProvider.GetService<ILoggerFactory>();
CosmosDbMetadata cosmosDbMetadata = JsonConvert.DeserializeObject<CosmosDbMetadata>(triggerMetadata.Metadata.ToString());
cosmosDbMetadata.ResolveProperties(serviceProvider.GetService<INameResolver>());
IOptions<CosmosClientOptions> options = serviceProvider.GetService<IOptions<CosmosClientOptions>>();
ICosmosDBServiceFactory serviceFactory = serviceProvider.GetService<ICosmosDBServiceFactory>();
CosmosClient cosmosClient = serviceFactory.CreateService(cosmosDbMetadata.Connection, options.Value);
var monitoredContainer = cosmosClient.GetContainer(cosmosDbMetadata.DatabaseName, cosmosDbMetadata.ContainerName);
var leaseContainer = cosmosClient.GetContainer(string.IsNullOrEmpty(cosmosDbMetadata.LeaseDatabaseName) ? cosmosDbMetadata.DatabaseName : cosmosDbMetadata.LeaseDatabaseName, string.IsNullOrEmpty(cosmosDbMetadata.LeaseContainerName) ? CosmosDBTriggerConstants.DefaultLeaseCollectionName : cosmosDbMetadata.LeaseContainerName);
_scaleMonitor = new CosmosDBScaleMonitor(triggerMetadata.FunctionName, loggerFactory.CreateLogger<CosmosDBScaleMonitor>(), monitoredContainer, leaseContainer, cosmosDbMetadata.LeaseContainerPrefix);
_targetScaler = new CosmosDBTargetScaler(triggerMetadata.FunctionName, cosmosDbMetadata.MaxItemsPerInvocation, monitoredContainer, leaseContainer, cosmosDbMetadata.LeaseContainerPrefix, loggerFactory.CreateLogger<CosmosDBTargetScaler>());
}

public IScaleMonitor GetMonitor()
{
return _scaleMonitor;
}

public ITargetScaler GetTargetScaler()
{
return _targetScaler;
}

internal class CosmosDbMetadata
{
[JsonProperty]
public string Connection { get; set; }

[JsonProperty]
public string DatabaseName { get; set; }

[JsonProperty]
public string ContainerName { get; set; }

[JsonProperty]
public string LeaseContainerName { get; set; }

[JsonProperty]
public string LeaseContainerPrefix { get; set; }

[JsonProperty]
public string LeaseDatabaseName { get; set; }

[JsonProperty]
public int MaxItemsPerInvocation { get; set; }

public void ResolveProperties(INameResolver resolver)
{
if (resolver != null)
{
DatabaseName = resolver.ResolveWholeString(DatabaseName);
ContainerName = resolver.ResolveWholeString(ContainerName);
LeaseContainerName = resolver.ResolveWholeString(LeaseContainerName);
LeaseContainerPrefix = resolver.ResolveWholeString(LeaseContainerPrefix) ?? string.Empty;
LeaseDatabaseName = resolver.ResolveWholeString(LeaseDatabaseName);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.33.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.36" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.37" />
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.1.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task CosmosDBEndToEnd()
{
using (var host = await StartHostAsync(typeof(EndToEndTestClass)))
{
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>());
var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService<IConfiguration>(), DatabaseName, CollectionName);

// Call the outputs function directly, which will write out 3 documents
// using with the 'input' property set to the value we provide.
Expand Down Expand Up @@ -75,15 +75,15 @@ await TestHelpers.Await(() =>
}
}

private async Task<CosmosClient> InitializeDocumentClientAsync(IConfiguration configuration)
public static async Task<CosmosClient> InitializeDocumentClientAsync(IConfiguration configuration, string databaseName, string collectionName)
{
var client = new CosmosClient(configuration.GetConnectionStringOrSetting(Constants.DefaultConnectionStringName).Value);

Database database = await client.CreateDatabaseIfNotExistsAsync(DatabaseName);
Database database = await client.CreateDatabaseIfNotExistsAsync(databaseName);

try
{
await database.GetContainer(CollectionName).ReadContainerAsync();
await database.GetContainer(collectionName).ReadContainerAsync();
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == System.Net.HttpStatusCode.NotFound)
{
Expand Down
184 changes: 184 additions & 0 deletions test/WebJobs.Extensions.CosmosDB.Tests/ScaleHostEndToEndTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data.Common;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Core;
using Castle.Core.Configuration;
using Castle.Core.Logging;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
using Microsoft.Azure.WebJobs.Extensions.Tests.Extensions.CosmosDB.Models;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Xunit;
using IConfiguration = Microsoft.Extensions.Configuration.IConfiguration;
using TestLoggerProvider = Microsoft.Azure.WebJobs.Host.TestCommon.TestLoggerProvider;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests
{
[Trait("Category", "E2E")]
public class ScaleHostEndToEndTests
{
private const string FunctionName = "Function1";
private const string DatabaseName = "E2EDb";
private const string CollectionName = "E2ECollection";
private const string Connection = "CosmosDbConnection";
private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider();

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task ScaleHostEndToEndTest(bool tbsEnabled)
{
IHost webJobsHost = new HostBuilder()
.ConfigureWebJobs()
.Build();
var configuration = webJobsHost.Services.GetRequiredService<IConfiguration>();
var connectionString = configuration.GetConnectionStringOrSetting(Constants.DefaultConnectionStringName).Value;
DbConnectionStringBuilder builder = new DbConnectionStringBuilder()
{
ConnectionString = connectionString
};

string triggers = $@"{{
""triggers"": [
{{
""name"": ""myQueueItem"",
""type"": ""queueTrigger"",
""direction"": ""in"",
""connection"": ""{Connection}"",
""databaseName"": ""{DatabaseName}"",
""containerName"": ""{CollectionName}"",
""MaxItemsPerInvocation"": 1,
""functionName"": ""{FunctionName}""
}}
]}}";

IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build();
AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService<AzureComponentFactory>();

string hostId = "test-host";
var loggerProvider = new TestLoggerProvider();

IHostBuilder hostBuilder = new HostBuilder();
hostBuilder.ConfigureLogging(configure =>
{
configure.SetMinimumLevel(LogLevel.Debug);
configure.AddProvider(loggerProvider);
});
hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) =>
{
var settings = new Dictionary<string, string>()
{
{ $"{Connection}", connectionString }
};
// Adding app setting
config.AddInMemoryCollection(settings);
})
.ConfigureServices(services =>
{
services.AddAzureStorageScaleServices();
services.AddSingleton<INameResolver, FakeNameResolver>();
})
.ConfigureWebJobsScale((context, builder) =>
{
builder.AddCosmosDB();
builder.UseHostId(hostId);
foreach (var jtoken in JObject.Parse(triggers)["triggers"])
{
TriggerMetadata metadata = new TriggerMetadata(jtoken as JObject);
builder.AddCosmosDbScaleForTrigger(metadata);
}
},
scaleOptions =>
{
scaleOptions.IsTargetScalingEnabled = tbsEnabled;
scaleOptions.MetricsPurgeEnabled = false;
scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4);
scaleOptions.IsRuntimeScalingEnabled = true;
scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1);
});

using (var client = await CosmosDBEndToEndTests.InitializeDocumentClientAsync(configuration, DatabaseName, CollectionName))
{
var container = client.GetDatabase(DatabaseName).GetContainer(CollectionName);

// Delete existing items
var array = container.GetItemLinqQueryable<Item>(allowSynchronousQueryExecution: true).ToArray();
foreach (var itemToDelete in array)
{
await container.DeleteItemAsync<Item>(itemToDelete.Id, new PartitionKey(itemToDelete.Id));
}

// Add new items to trigger scale
for (int i = 0; i < 2; i++)
{
var item = new Item() { Id = Guid.NewGuid().ToString(), Text = "Scale" };
PartitionKey pk = new PartitionKey(item.Id);
await container.UpsertItemAsync<Item>(item, pk);
}
}

IHost scaleHost = hostBuilder.Build();
await scaleHost.StartAsync();

await Host.TestCommon.TestHelpers.Await(async () =>
{
IScaleStatusProvider scaleManager = scaleHost.Services.GetService<IScaleStatusProvider>();
var scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext());
bool scaledOut = false;
if (!tbsEnabled)
{
scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null && scaleStatus.FunctionTargetScalerResults.Count == 0
&& scaleStatus.FunctionScaleStatuses[FunctionName].Vote == ScaleVote.ScaleOut;
if (scaledOut)
{
var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray();
Assert.Contains(logMessages, p => p.Contains("1 scale monitors to sample"));
}
}
else
{
scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 1 && scaleStatus.FunctionScaleStatuses.Count == 0
&& scaleStatus.FunctionTargetScalerResults[FunctionName].TargetWorkerCount == 1;
if (scaledOut)
{
var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray();
Assert.Contains(logMessages, p => p.Contains("1 target scalers to sample"));
}
}
if (scaledOut)
{
var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray();
Assert.Contains(logMessages, p => p.Contains("Runtime scale monitoring is enabled."));
if (!tbsEnabled)
{
Assert.Contains(logMessages, p => p.Contains("Scaling out based on votes"));
}
}
return scaledOut;
}, pollingInterval: 2000, timeout: 180000, throwWhenDebugging: true);
}
}
}
Loading

0 comments on commit 2ca4105

Please sign in to comment.