diff --git a/src/WebJobs.Extensions.CosmosDB/Config/CosmosDBWebJobsBuilderExtensions.cs b/src/WebJobs.Extensions.CosmosDB/Config/CosmosDBWebJobsBuilderExtensions.cs index 700f18057..975d96b3f 100644 --- a/src/WebJobs.Extensions.CosmosDB/Config/CosmosDBWebJobsBuilderExtensions.cs +++ b/src/WebJobs.Extensions.CosmosDB/Config/CosmosDBWebJobsBuilderExtensions.cs @@ -4,6 +4,8 @@ using System; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.CosmosDB; +using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -63,5 +65,31 @@ public static IWebJobsBuilder AddCosmosDB(this IWebJobsBuilder builder, Action + /// Adds the Storage Queues extension to the provided . + /// + /// + /// Trigger metadata. + /// + public static IWebJobsBuilder AddCosmosDbScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata) + { + IServiceProvider serviceProvider = null; + Lazy scalerProvider = new Lazy(() => new CosmosDbScalerProvider(serviceProvider, triggerMetadata)); + + builder.Services.AddSingleton(resolvedServiceProvider => + { + serviceProvider = serviceProvider ?? resolvedServiceProvider; + return scalerProvider.Value; + }); + + builder.Services.AddSingleton(resolvedServiceProvider => + { + serviceProvider = serviceProvider ?? resolvedServiceProvider; + return scalerProvider.Value; + }); + + return builder; + } } } \ No newline at end of file diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index d93cd86f1..6b3f56cce 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -16,17 +16,17 @@ internal class CosmosDBTargetScaler : ITargetScaler private readonly TargetScalerDescriptor _targetScalerDescriptor; private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; private readonly ILogger _logger; - private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; + private readonly int _maxItemsPerInvocation; private readonly Container _monitoredContainer; - public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) + public CosmosDBTargetScaler(string functionId, int maxItemsPerInvocation, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _functionId = functionId; _targetScalerDescriptor = new TargetScalerDescriptor(functionId); _monitoredContainer = monitoredContainer; _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, _monitoredContainer, leaseContainer, processorName); _logger = logger; - _cosmosDBTriggerAttribute = cosmosDBTriggerAttribute; + _maxItemsPerInvocation = maxItemsPerInvocation; } public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor; @@ -44,7 +44,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, if (!context.InstanceConcurrency.HasValue) { - concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0 ? _cosmosDBTriggerAttribute.MaxItemsPerInvocation : DefaultMaxItemsPerInvocation; + concurrency = _maxItemsPerInvocation > 0 ? _maxItemsPerInvocation : DefaultMaxItemsPerInvocation; } else { diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 48ca72b22..8bd3627d0 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -61,7 +61,7 @@ public CosmosDBTriggerListener( $"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'"; this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName); - this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute, _monitoredContainer, _leaseContainer, _processorName, _logger); + this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute.MaxItemsPerInvocation, _monitoredContainer, _leaseContainer, _processorName, _logger); } public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDbScalerProvider.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDbScalerProvider.cs new file mode 100644 index 000000000..b05bd599f --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDbScalerProvider.cs @@ -0,0 +1,92 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using static Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger.CosmosDbScalerProvider; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger +{ + internal class CosmosDbScalerProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private readonly CosmosDBScaleMonitor _scaleMonitor; + private readonly CosmosDBTargetScaler _targetScaler; + + public CosmosDbScalerProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata) + { + AzureComponentFactory azureComponentFactory = null; + if (triggerMetadata.Properties != null && triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value)) + { + azureComponentFactory = value as AzureComponentFactory; + } + else + { + azureComponentFactory = serviceProvider.GetService(); + } + + ILoggerFactory loggerFactory = serviceProvider.GetService(); + CosmosDbMetadata cosmosDbMetadata = JsonConvert.DeserializeObject(triggerMetadata.Metadata.ToString()); + cosmosDbMetadata.ResolveProperties(serviceProvider.GetService()); + IOptions options = serviceProvider.GetService>(); + ICosmosDBServiceFactory serviceFactory = serviceProvider.GetService(); + CosmosClient cosmosClient = serviceFactory.CreateService(cosmosDbMetadata.Connection, options.Value); + var monitoredContainer = cosmosClient.GetContainer(cosmosDbMetadata.DatabaseName, cosmosDbMetadata.ContainerName); + var leaseContainer = cosmosClient.GetContainer(string.IsNullOrEmpty(cosmosDbMetadata.LeaseDatabaseName) ? cosmosDbMetadata.DatabaseName : cosmosDbMetadata.LeaseDatabaseName, string.IsNullOrEmpty(cosmosDbMetadata.LeaseContainerName) ? CosmosDBTriggerConstants.DefaultLeaseCollectionName : cosmosDbMetadata.LeaseContainerName); + _scaleMonitor = new CosmosDBScaleMonitor(triggerMetadata.FunctionName, loggerFactory.CreateLogger(), monitoredContainer, leaseContainer, cosmosDbMetadata.LeaseContainerPrefix); + _targetScaler = new CosmosDBTargetScaler(triggerMetadata.FunctionName, cosmosDbMetadata.MaxItemsPerInvocation, monitoredContainer, leaseContainer, cosmosDbMetadata.LeaseContainerPrefix, loggerFactory.CreateLogger()); + } + + public IScaleMonitor GetMonitor() + { + return _scaleMonitor; + } + + public ITargetScaler GetTargetScaler() + { + return _targetScaler; + } + + internal class CosmosDbMetadata + { + [JsonProperty] + public string Connection { get; set; } + + [JsonProperty] + public string DatabaseName { get; set; } + + [JsonProperty] + public string ContainerName { get; set; } + + [JsonProperty] + public string LeaseContainerName { get; set; } + + [JsonProperty] + public string LeaseContainerPrefix { get; set; } + + [JsonProperty] + public string LeaseDatabaseName { get; set; } + + [JsonProperty] + public int MaxItemsPerInvocation { get; set; } + + public void ResolveProperties(INameResolver resolver) + { + if (resolver != null) + { + DatabaseName = resolver.ResolveWholeString(DatabaseName); + ContainerName = resolver.ResolveWholeString(ContainerName); + LeaseContainerName = resolver.ResolveWholeString(LeaseContainerName); + LeaseContainerPrefix = resolver.ResolveWholeString(LeaseContainerPrefix) ?? string.Empty; + LeaseDatabaseName = resolver.ResolveWholeString(LeaseDatabaseName); + } + } + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 0a6999c53..f9c90d267 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -20,7 +20,7 @@ - + diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs index b39e9dbf9..1fbd6ef07 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBEndToEndTests.cs @@ -32,7 +32,7 @@ public async Task CosmosDBEndToEnd() { using (var host = await StartHostAsync(typeof(EndToEndTestClass))) { - var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService()); + var client = await InitializeDocumentClientAsync(host.Services.GetRequiredService(), DatabaseName, CollectionName); // Call the outputs function directly, which will write out 3 documents // using with the 'input' property set to the value we provide. @@ -75,15 +75,15 @@ await TestHelpers.Await(() => } } - private async Task InitializeDocumentClientAsync(IConfiguration configuration) + public static async Task InitializeDocumentClientAsync(IConfiguration configuration, string databaseName, string collectionName) { var client = new CosmosClient(configuration.GetConnectionStringOrSetting(Constants.DefaultConnectionStringName).Value); - Database database = await client.CreateDatabaseIfNotExistsAsync(DatabaseName); + Database database = await client.CreateDatabaseIfNotExistsAsync(databaseName); try { - await database.GetContainer(CollectionName).ReadContainerAsync(); + await database.GetContainer(collectionName).ReadContainerAsync(); } catch (CosmosException cosmosException) when (cosmosException.StatusCode == System.Net.HttpStatusCode.NotFound) { diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/ScaleHostEndToEndTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/ScaleHostEndToEndTests.cs new file mode 100644 index 000000000..a06742375 --- /dev/null +++ b/test/WebJobs.Extensions.CosmosDB.Tests/ScaleHostEndToEndTests.cs @@ -0,0 +1,184 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Configuration; +using System.Data.Common; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Azure.Core; +using Castle.Core.Configuration; +using Castle.Core.Logging; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.Tests.Common; +using Microsoft.Azure.WebJobs.Extensions.Tests.Extensions.CosmosDB.Models; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using Xunit; +using IConfiguration = Microsoft.Extensions.Configuration.IConfiguration; +using TestLoggerProvider = Microsoft.Azure.WebJobs.Host.TestCommon.TestLoggerProvider; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests +{ + [Trait("Category", "E2E")] + public class ScaleHostEndToEndTests + { + private const string FunctionName = "Function1"; + private const string DatabaseName = "E2EDb"; + private const string CollectionName = "E2ECollection"; + private const string Connection = "CosmosDbConnection"; + private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider(); + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ScaleHostEndToEndTest(bool tbsEnabled) + { + IHost webJobsHost = new HostBuilder() + .ConfigureWebJobs() + .Build(); + var configuration = webJobsHost.Services.GetRequiredService(); + var connectionString = configuration.GetConnectionStringOrSetting(Constants.DefaultConnectionStringName).Value; + DbConnectionStringBuilder builder = new DbConnectionStringBuilder() + { + ConnectionString = connectionString + }; + + string triggers = $@"{{ + ""triggers"": [ + {{ + ""name"": ""myQueueItem"", + ""type"": ""queueTrigger"", + ""direction"": ""in"", + ""connection"": ""{Connection}"", + ""databaseName"": ""{DatabaseName}"", + ""containerName"": ""{CollectionName}"", + ""MaxItemsPerInvocation"": 1, + ""functionName"": ""{FunctionName}"" + }} + ]}}"; + + IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build(); + AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService(); + + string hostId = "test-host"; + var loggerProvider = new TestLoggerProvider(); + + IHostBuilder hostBuilder = new HostBuilder(); + hostBuilder.ConfigureLogging(configure => + { + configure.SetMinimumLevel(LogLevel.Debug); + configure.AddProvider(loggerProvider); + }); + hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) => + { + var settings = new Dictionary() + { + { $"{Connection}", connectionString } + }; + + // Adding app setting + config.AddInMemoryCollection(settings); + }) + .ConfigureServices(services => + { + services.AddAzureStorageScaleServices(); + services.AddSingleton(); + }) + .ConfigureWebJobsScale((context, builder) => + { + builder.AddCosmosDB(); + builder.UseHostId(hostId); + + foreach (var jtoken in JObject.Parse(triggers)["triggers"]) + { + TriggerMetadata metadata = new TriggerMetadata(jtoken as JObject); + builder.AddCosmosDbScaleForTrigger(metadata); + } + }, + scaleOptions => + { + scaleOptions.IsTargetScalingEnabled = tbsEnabled; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1); + }); + + using (var client = await CosmosDBEndToEndTests.InitializeDocumentClientAsync(configuration, DatabaseName, CollectionName)) + { + var container = client.GetDatabase(DatabaseName).GetContainer(CollectionName); + + // Delete existing items + var array = container.GetItemLinqQueryable(allowSynchronousQueryExecution: true).ToArray(); + foreach (var itemToDelete in array) + { + await container.DeleteItemAsync(itemToDelete.Id, new PartitionKey(itemToDelete.Id)); + } + + // Add new items to trigger scale + for (int i = 0; i < 2; i++) + { + var item = new Item() { Id = Guid.NewGuid().ToString(), Text = "Scale" }; + PartitionKey pk = new PartitionKey(item.Id); + await container.UpsertItemAsync(item, pk); + } + } + + IHost scaleHost = hostBuilder.Build(); + await scaleHost.StartAsync(); + + await Host.TestCommon.TestHelpers.Await(async () => + { + IScaleStatusProvider scaleManager = scaleHost.Services.GetService(); + + var scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext()); + + bool scaledOut = false; + if (!tbsEnabled) + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null && scaleStatus.FunctionTargetScalerResults.Count == 0 + && scaleStatus.FunctionScaleStatuses[FunctionName].Vote == ScaleVote.ScaleOut; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains(logMessages, p => p.Contains("1 scale monitors to sample")); + } + } + else + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 1 && scaleStatus.FunctionScaleStatuses.Count == 0 + && scaleStatus.FunctionTargetScalerResults[FunctionName].TargetWorkerCount == 1; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains(logMessages, p => p.Contains("1 target scalers to sample")); + } + } + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains(logMessages, p => p.Contains("Runtime scale monitoring is enabled.")); + if (!tbsEnabled) + { + Assert.Contains(logMessages, p => p.Contains("Scaling out based on votes")); + } + } + + return scaledOut; + }, pollingInterval: 2000, timeout: 180000, throwWhenDebugging: true); + } + } +} diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs index 7f969f595..8ca69b2e5 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs @@ -1,5 +1,6 @@ -using System; -using System.Collections; +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -60,7 +61,7 @@ public CosmosDBTargetScalerTests() .Returns(estimator.Object); _attribute = new CosmosDBTriggerAttribute(DatabaseName, ContainerName); - _targetScaler = new CosmosDBTargetScaler(_functionId, _attribute, _monitoredContainer.Object, _leasesContainer.Object, ProcessorName, _loggerFactory.CreateLogger>()); + _targetScaler = new CosmosDBTargetScaler(_functionId, _attribute.MaxItemsPerInvocation, _monitoredContainer.Object, _leasesContainer.Object, ProcessorName, _loggerFactory.CreateLogger>()); } [Theory] diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj index 88a040fb2..79a779db6 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj +++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj @@ -8,7 +8,11 @@ - + + + + +