From 58cad688e869f3e962a64419a271e6a92319eb33 Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 16 Oct 2024 15:55:48 -0700 Subject: [PATCH 1/9] build java smoke test on build pipeline (#2941) --- eng/templates/build.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/eng/templates/build.yml b/eng/templates/build.yml index 3e3e41040..c68e0d44d 100644 --- a/eng/templates/build.yml +++ b/eng/templates/build.yml @@ -124,3 +124,14 @@ jobs: SourceFolder: '$(System.DefaultWorkingDirectory)/test/PerfTests/DFPerfTests/Output/' Contents: '**' TargetFolder: '$(System.DefaultWorkingDirectory)/azure-functions-durable-extension/' + + # We also need to build the Java smoke test, for CodeQL compliance + # We don't need to build the other smoke tests, because they can be analyzed without being compiled, + # as they're interpreted languages. + # This could be a separate pipeline, but the task is so small that it's paired with the .NET code build + # for convenience. + - pwsh: | + cd ./test/SmokeTests/OOProcSmokeTests/durableJava/ + gradle build + ls + displayName: 'Build Java OOProc test (for CodeQL compliance)' \ No newline at end of file From 6a63e24065c99504bc98f28d2c82cd43d4374c77 Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 16 Oct 2024 15:56:20 -0700 Subject: [PATCH 2/9] Add automated release pipeline (#2932) --- eng/ci/publish.yml | 99 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 eng/ci/publish.yml diff --git a/eng/ci/publish.yml b/eng/ci/publish.yml new file mode 100644 index 000000000..ca6760e39 --- /dev/null +++ b/eng/ci/publish.yml @@ -0,0 +1,99 @@ +# This is our package-publishing pipeline. +# When executed, it automatically publishes the output of the 'official pipeline' (the nupkgs) to our internal ADO feed. +# It may optionally also publish the packages to NuGet, but that is gated behind a manual approval. + +trigger: none # only trigger is manual +pr: none # only trigger is manual + +# We include to this variable group to be able to access the NuGet API key +variables: +- group: durabletask_config + +resources: + repositories: + - repository: 1es + type: git + name: 1ESPipelineTemplates/1ESPipelineTemplates + ref: refs/tags/release + - repository: eng + type: git + name: engineering + ref: refs/tags/release + + pipelines: + - pipeline: officialPipeline # Reference to the pipeline to be used as an artifact source + source: 'durabletask-extension.official' + +extends: + template: v1/1ES.Official.PipelineTemplate.yml@1es + parameters: + pool: + name: 1es-pool-azfunc + image: 1es-windows-2022 + os: windows + + stages: + - stage: release + jobs: + + # ADO release + - job: adoRelease + displayName: ADO Release + templateContext: + inputs: + - input: pipelineArtifact + pipeline: officialPipeline # Pipeline reference, as defined in the resources section + artifactName: drop + targetPath: $(System.DefaultWorkingDirectory)/drop + + # The preferred method of release on 1ES is by populating the 'output' section of a 1ES template. + # We use this method to release to ADO, but not to release to NuGet; this is explained in the 'nugetRelease' job. + # To read more about the 'output syntax', see: + # - https://eng.ms/docs/cloud-ai-platform/devdiv/one-engineering-system-1es/1es-docs/1es-pipeline-templates/features/outputs + # - https://eng.ms/docs/cloud-ai-platform/devdiv/one-engineering-system-1es/1es-docs/1es-pipeline-templates/features/outputs/nuget-packages + outputs: + - output: nuget # 'nuget' is an output "type" for pushing to NuGet + displayName: 'Push to durabletask ADO feed' + packageParentPath: $(System.DefaultWorkingDirectory) # This needs to be set to some prefix of the `packagesToPush` parameter. Apparently it helps with SDL tooling + packagesToPush: '$(System.DefaultWorkingDirectory)/**/*.nupkg;!$(System.DefaultWorkingDirectory)/**/*.symbols.nupkg' + publishVstsFeed: '3f99e810-c336-441f-8892-84983093ad7f/c895696b-ce37-4fe7-b7ce-74333a04f8bf' + allowPackageConflicts: true + + # NuGet approval gate + - job: nugetApproval + displayName: NuGetApproval + pool: server # This task only works when executed on serverl pools, so this needs to be specified + steps: + # Wait for manual approval. + - task: ManualValidation@1 + inputs: + instructions: Confirm you want to push to NuGet + onTimeout: 'reject' + + # NuGet release + - job: nugetRelease + displayName: NuGet Release + dependsOn: + - nugetApproval + - adoRelease + condition: succeeded('nugetApproval', 'adoRelease') + templateContext: + inputs: + - input: pipelineArtifact + pipeline: officialPipeline # Pipeline reference as defined in the resources section + artifactName: drop + targetPath: $(System.DefaultWorkingDirectory)/drop + # Ideally, we would push to NuGet using the 1ES "template output" syntax, like we do for ADO. + # Unfortunately, that syntax does not allow for skipping duplicates when pushing to NuGet feeds + # (i.e; not failing the job when trying to push a package version that already exists on NuGet). + # This is a problem for us because our pipelines often produce multiple packages, and we want to be able to + # perform a 'nuget push *.nupkg' that skips packages already on NuGet while pushing the rest. + # Therefore, we use a regular .NET Core ADO Task to publish the packages until that usability gap is addressed. + steps: + - task: DotNetCoreCLI@2 + displayName: 'Push to nuget.org' + inputs: + command: custom + custom: nuget + arguments: 'push "*.nupkg" --api-key $(nuget_api_key) --skip-duplicate --source https://api.nuget.org/v3/index.json' + workingDirectory: '$(System.DefaultWorkingDirectory)/drop' \ No newline at end of file From 820e9dde2f4abb44af056dcb3cf7a1edb450fe9b Mon Sep 17 00:00:00 2001 From: hctan Date: Fri, 18 Oct 2024 02:12:31 +0800 Subject: [PATCH 3/9] Fix custom connection name not working when using IDurableClientFactory -> CreateClient(). (#2923) Co-authored-by: Tan Han Chong --- release_notes.md | 2 + .../AzureStorageDurabilityProviderFactory.cs | 2 +- .../CustomTestStorageAccountProvider.cs | 33 +++++++++++++++ ...reStorageDurabilityProviderFactoryTests.cs | 41 +++++++++++++++++++ 4 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 test/Common/CustomTestStorageAccountProvider.cs diff --git a/release_notes.md b/release_notes.md index 4d172b565..aa232c219 100644 --- a/release_notes.md +++ b/release_notes.md @@ -6,6 +6,8 @@ ### Bug Fixes +- Fix custom connection name not working when using IDurableClientFactory.CreateClient() - contributed by [@hctan](https://github.com/hctan) + ### Breaking Changes ### Dependency Updates diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs index 242bef777..51015f79e 100644 --- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs +++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProviderFactory.cs @@ -159,7 +159,7 @@ private AzureStorageDurabilityProvider GetAzureStorageStorageProvider(DurableCli // Need to check this.defaultStorageProvider != null for external clients that call GetDurabilityProvider(attribute) // which never initializes the defaultStorageProvider. if (string.Equals(this.defaultSettings?.TaskHubName, settings.TaskHubName, StringComparison.OrdinalIgnoreCase) && - string.Equals(this.defaultSettings?.StorageConnectionString, settings.StorageConnectionString, StringComparison.OrdinalIgnoreCase) && + string.Equals(this.defaultSettings?.StorageAccountDetails?.ConnectionString, settings.StorageAccountDetails?.ConnectionString, StringComparison.OrdinalIgnoreCase) && this.defaultStorageProvider != null) { // It's important that clients use the same AzureStorageOrchestrationService instance diff --git a/test/Common/CustomTestStorageAccountProvider.cs b/test/Common/CustomTestStorageAccountProvider.cs new file mode 100644 index 000000000..41337555a --- /dev/null +++ b/test/Common/CustomTestStorageAccountProvider.cs @@ -0,0 +1,33 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +using System; +using DurableTask.AzureStorage; +using Microsoft.WindowsAzure.Storage; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests +{ + internal class CustomTestStorageAccountProvider : IStorageAccountProvider + { + private readonly string customConnectionString; + private readonly string customConnectionName; + + public CustomTestStorageAccountProvider(string connectionName) + { + this.customConnectionName = connectionName; + this.customConnectionString = $"DefaultEndpointsProtocol=https;AccountName=test;AccountKey={GenerateRandomKey()};EndpointSuffix=core.windows.net"; + } + + public CloudStorageAccount GetCloudStorageAccount(string name) => + CloudStorageAccount.Parse(name != this.customConnectionName ? TestHelpers.GetStorageConnectionString() : this.customConnectionString); + + public StorageAccountDetails GetStorageAccountDetails(string name) => + new StorageAccountDetails { ConnectionString = name != this.customConnectionName ? TestHelpers.GetStorageConnectionString() : this.customConnectionString }; + + private static string GenerateRandomKey() + { + string key = Guid.NewGuid().ToString(); + return Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes(key)); + } + } +} diff --git a/test/FunctionsV2/AzureStorageDurabilityProviderFactoryTests.cs b/test/FunctionsV2/AzureStorageDurabilityProviderFactoryTests.cs index 30363c325..a55933384 100644 --- a/test/FunctionsV2/AzureStorageDurabilityProviderFactoryTests.cs +++ b/test/FunctionsV2/AzureStorageDurabilityProviderFactoryTests.cs @@ -154,5 +154,46 @@ public void EnvironmentIsVMSS_WorkerIdFromEnvironmentVariables() Assert.Equal("waws-prod-euapbn1-003:dw0SmallDedicatedWebWorkerRole_hr0HostRole-3-VM-13", settings.WorkerId); } + + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public void CustomConnectionNameIsResolved() + { + var storageAccountProvider = new CustomTestStorageAccountProvider("CustomConnection"); + var mockOptions = new OptionsWrapper(new DurableTaskOptions()); + var nameResolver = new Mock().Object; + + var factory = new AzureStorageDurabilityProviderFactory( + mockOptions, + storageAccountProvider, + nameResolver, + NullLoggerFactory.Instance, + TestHelpers.GetMockPlatformInformationService()); + + factory.GetDurabilityProvider(); // This will initialize the default connection string + var provider = factory.GetDurabilityProvider(new DurableClientAttribute() { ConnectionName = "CustomConnection", TaskHub = "TestHubName" }); + + Assert.Equal("CustomConnection", provider.ConnectionName); + } + + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public void DefaultConnectionNameIsResolved() + { + var storageAccountProvider = new CustomTestStorageAccountProvider("CustomConnection"); + var mockOptions = new OptionsWrapper(new DurableTaskOptions()); + var nameResolver = new Mock().Object; + + var factory = new AzureStorageDurabilityProviderFactory( + mockOptions, + storageAccountProvider, + nameResolver, + NullLoggerFactory.Instance, + TestHelpers.GetMockPlatformInformationService()); + + var provider = factory.GetDurabilityProvider(); + + Assert.Equal("Storage", provider.ConnectionName); + } } } From 7dcbab4b4bf9675d96ef50bc95216695e186c28e Mon Sep 17 00:00:00 2001 From: schlechtums Date: Thu, 17 Oct 2024 14:13:41 -0400 Subject: [PATCH 4/9] Correct typo in analyzer description from definied -> defined (#2901) --- .../Resources.Designer.cs | 4 ++-- src/WebJobs.Extensions.DurableTask.Analyzers/Resources.resx | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.Designer.cs b/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.Designer.cs index d72cd9b48..b3d180153 100644 --- a/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.Designer.cs +++ b/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.Designer.cs @@ -621,7 +621,7 @@ public static string IOTypesAnalyzerTitle { } /// - /// Looks up a localized string similar to Method call '{0}' violates the orchestrator deterministic code constraint. Methods definied in source code that are used in an orchestrator must be deterministic.. + /// Looks up a localized string similar to Method call '{0}' violates the orchestrator deterministic code constraint. Methods defined in source code that are used in an orchestrator must be deterministic.. /// public static string MethodAnalyzerMessageFormat { get { @@ -630,7 +630,7 @@ public static string MethodAnalyzerMessageFormat { } /// - /// Looks up a localized string similar to Methods definied in source code that are used in an orchestrator must be deterministic.. + /// Looks up a localized string similar to Methods defined in source code that are used in an orchestrator must be deterministic.. /// public static string MethodAnalyzerTitle { get { diff --git a/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.resx b/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.resx index d40722bfb..1eb6cd2a4 100644 --- a/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.resx +++ b/src/WebJobs.Extensions.DurableTask.Analyzers/Resources.resx @@ -306,10 +306,10 @@ https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-check I/O operations are not allowed inside an orchestrator function. - Method call '{0}' violates the orchestrator deterministic code constraint. Methods definied in source code that are used in an orchestrator must be deterministic. + Method call '{0}' violates the orchestrator deterministic code constraint. Methods defined in source code that are used in an orchestrator must be deterministic. - Methods definied in source code that are used in an orchestrator must be deterministic. + Methods defined in source code that are used in an orchestrator must be deterministic. SignalEntityAsync must use an Entity Interface. From 4cc6ec2167bcc3b027dbf382a4540c0524955ce3 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 21 Oct 2024 14:53:26 -0700 Subject: [PATCH 5/9] Change 'durabletask-extension.official' to `durable-extension.official' in publish pipeline (#2947) --- eng/ci/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eng/ci/publish.yml b/eng/ci/publish.yml index ca6760e39..8b65519fa 100644 --- a/eng/ci/publish.yml +++ b/eng/ci/publish.yml @@ -22,7 +22,7 @@ resources: pipelines: - pipeline: officialPipeline # Reference to the pipeline to be used as an artifact source - source: 'durabletask-extension.official' + source: 'durable-extension.official' extends: template: v1/1ES.Official.PipelineTemplate.yml@1es From a7c6d6944041b48267ed8409bb8ec8144ab9bab9 Mon Sep 17 00:00:00 2001 From: Naiyuan Tian <110135109+nytian@users.noreply.github.com> Date: Wed, 23 Oct 2024 09:58:46 -0700 Subject: [PATCH 6/9] Implement CreateHttpManagementPayload API in Durable Worker Extension (#2929) * initial commit * add comment * add test * update by comment * add httpmanagementpayload class * re-arrange if section to make code more readable * remove unnecessary exception catch * add nullable check at HttpManagementPayload * add nullable check * Add comment as suggested * Update FunctionsDurableTaskClientTests.cs * update a typo as I found this at my e2e test --- .../Bindings/BindingHelper.cs | 9 ++ ...t.Azure.WebJobs.Extensions.DurableTask.xml | 7 ++ .../DurableTaskClientConverter.cs | 4 +- .../DurableTaskClientExtensions.cs | 79 ++++++++++++--- .../FunctionsDurableTaskClient.cs | 5 +- .../HttpManagementPayload.cs | 97 +++++++++++++++++++ .../FunctionsDurableTaskClientTests.cs | 54 ++++++++++- 7 files changed, 233 insertions(+), 22 deletions(-) create mode 100644 src/Worker.Extensions.DurableTask/HttpManagementPayload.cs diff --git a/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs b/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs index a0c2fddde..499628929 100644 --- a/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs +++ b/src/WebJobs.Extensions.DurableTask/Bindings/BindingHelper.cs @@ -43,6 +43,7 @@ public string DurableOrchestrationClientToString(IDurableOrchestrationClient cli ConnectionName = attr.ConnectionName, RpcBaseUrl = localRpcAddress, RequiredQueryStringParameters = this.config.HttpApiHandler.GetUniversalQueryStrings(), + HttpBaseUrl = this.config.HttpApiHandler.GetBaseUrl(), }); } @@ -130,6 +131,14 @@ private class OrchestrationClientInputData /// [JsonProperty("rpcBaseUrl")] public string? RpcBaseUrl { get; set; } + + /// + /// The base URL of the Azure Functions host, used in the out-of-proc model. + /// This URL is sent by the client binding object to the Durable Worker extension, + /// allowing the extension to know the host's base URL for constructing management URLs. + /// + [JsonProperty("httpBaseUrl")] + public string? HttpBaseUrl { get; set; } } } } diff --git a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml index 8faf3dfff..cdf5d8700 100644 --- a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml +++ b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml @@ -93,6 +93,13 @@ HTTP endpoint. For out-of-proc "v2" (middelware passthrough), this is a gRPC endpoint. + + + The base URL of the Azure Functions host, used in the out-of-proc model. + This URL is sent by the client binding object to the Durable Worker extension, + allowing the extension to know the host's base URL for constructing management URLs. + + The result of a clean entity storage operation. diff --git a/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs b/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs index 1d3da9003..2cfc2706e 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskClientConverter.cs @@ -49,7 +49,7 @@ public ValueTask ConvertAsync(ConverterContext context) } DurableTaskClient client = this.clientProvider.GetClient(endpoint, inputData?.taskHubName, inputData?.connectionName); - client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters); + client = new FunctionsDurableTaskClient(client, inputData!.requiredQueryStringParameters, inputData!.httpBaseUrl); return new ValueTask(ConversionResult.Success(client)); } catch (Exception innerException) @@ -62,5 +62,5 @@ public ValueTask ConvertAsync(ConverterContext context) } // Serializer is case-sensitive and incoming JSON properties are camel-cased. - private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters); + private record DurableClientInputData(string rpcBaseUrl, string taskHubName, string connectionName, string requiredQueryStringParameters, string httpBaseUrl); } diff --git a/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs b/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs index 286c206fe..bbd6222a8 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs @@ -120,8 +120,30 @@ public static HttpResponseData CreateCheckStatusResponse( return response; } - private static object SetHeadersAndGetPayload( - DurableTaskClient client, HttpRequestData request, HttpResponseData response, string instanceId) + /// + /// Creates an HTTP management payload for the specified orchestration instance. + /// + /// The . + /// The ID of the orchestration instance. + /// Optional HTTP request data to use for creating the base URL. + /// An object containing instance control URLs. + /// Thrown when instanceId is null or empty. + /// Thrown when a valid base URL cannot be determined. + public static HttpManagementPayload CreateHttpManagementPayload( + this DurableTaskClient client, + string instanceId, + HttpRequestData? request = null) + { + if (string.IsNullOrEmpty(instanceId)) + { + throw new ArgumentException("InstanceId cannot be null or empty.", nameof(instanceId)); + } + + return SetHeadersAndGetPayload(client, request, null, instanceId); + } + + private static HttpManagementPayload SetHeadersAndGetPayload( + DurableTaskClient client, HttpRequestData? request, HttpResponseData? response, string instanceId) { static string BuildUrl(string url, params string?[] queryValues) { @@ -143,22 +165,46 @@ static string BuildUrl(string url, params string?[] queryValues) // request headers into consideration and generate the base URL accordingly. // More info: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded. // One potential workaround is to set ASPNETCORE_FORWARDEDHEADERS_ENABLED to true. - string baseUrl = request.Url.GetLeftPart(UriPartial.Authority); + + // If HttpRequestData is provided, use its URL; otherwise, get the baseUrl from the DurableTaskClient. + // The base URL could be null if: + // 1. The DurableTaskClient isn't a FunctionsDurableTaskClient (which would have the baseUrl from bindings) + // 2. There's no valid HttpRequestData provided + string? baseUrl = ((request != null) ? request.Url.GetLeftPart(UriPartial.Authority) : GetBaseUrl(client)); + + if (baseUrl == null) + { + throw new InvalidOperationException("Failed to create HTTP management payload as base URL is null. Either use Functions bindings or provide an HTTP request to create the HttpPayload."); + } + + bool isFromRequest = request != null; + string formattedInstanceId = Uri.EscapeDataString(instanceId); - string instanceUrl = $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}"; + + // The baseUrl differs depending on the source. Eg: + // - From request: http://localhost:7071/ + // - From durable client: http://localhost:7071/runtime/webhooks/durabletask + // We adjust the instanceUrl construction accordingly. + string instanceUrl = isFromRequest + ? $"{baseUrl}/runtime/webhooks/durabletask/instances/{formattedInstanceId}" + : $"{baseUrl}/instances/{formattedInstanceId}"; string? commonQueryParameters = GetQueryParams(client); - response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters)); - response.Headers.Add("Content-Type", "application/json"); + + if (response != null) + { + response.Headers.Add("Location", BuildUrl(instanceUrl, commonQueryParameters)); + response.Headers.Add("Content-Type", "application/json"); + } - return new + return new HttpManagementPayload { - id = instanceId, - purgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters), - sendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters), - statusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters), - terminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters), - suspendPostUri = BuildUrl($"{instanceUrl}/suspend", "reason={{text}}", commonQueryParameters), - resumePostUri = BuildUrl($"{instanceUrl}/resume", "reason={{text}}", commonQueryParameters) + Id = instanceId, + PurgeHistoryDeleteUri = BuildUrl(instanceUrl, commonQueryParameters), + SendEventPostUri = BuildUrl($"{instanceUrl}/raiseEvent/{{eventName}}", commonQueryParameters), + StatusQueryGetUri = BuildUrl(instanceUrl, commonQueryParameters), + TerminatePostUri = BuildUrl($"{instanceUrl}/terminate", "reason={{text}}", commonQueryParameters), + SuspendPostUri = BuildUrl($"{instanceUrl}/suspend", "reason={{text}}", commonQueryParameters), + ResumePostUri = BuildUrl($"{instanceUrl}/resume", "reason={{text}}", commonQueryParameters) }; } @@ -172,4 +218,9 @@ private static ObjectSerializer GetObjectSerializer(HttpResponseData response) { return client is FunctionsDurableTaskClient functions ? functions.QueryString : null; } + + private static string? GetBaseUrl(DurableTaskClient client) + { + return client is FunctionsDurableTaskClient functions ? functions.HttpBaseUrl : null; + } } diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs index 0f9231375..3c919362d 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs @@ -17,15 +17,16 @@ internal sealed class FunctionsDurableTaskClient : DurableTaskClient { private readonly DurableTaskClient inner; - public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString) + public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString, string? httpBaseUrl) : base(inner.Name) { this.inner = inner; this.QueryString = queryString; + this.HttpBaseUrl = httpBaseUrl; } public string? QueryString { get; } - + public string? HttpBaseUrl { get; } public override DurableEntityClient Entities => this.inner.Entities; public override ValueTask DisposeAsync() diff --git a/src/Worker.Extensions.DurableTask/HttpManagementPayload.cs b/src/Worker.Extensions.DurableTask/HttpManagementPayload.cs new file mode 100644 index 000000000..4e7228f87 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/HttpManagementPayload.cs @@ -0,0 +1,97 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// This is a copy of: https://github.com/Azure/azure-functions-durable-extension/blob/dev/src/WebJobs.Extensions.DurableTask/HttpManagementPayload.cs + +using System; +using System.Collections.Generic; +using System.Text; +using Newtonsoft.Json; + +namespace Microsoft.Azure.Functions.Worker; + +/// +/// Data structure containing status, terminate and send external event HTTP endpoints. +/// +public class HttpManagementPayload +{ + /// + /// Gets the ID of the orchestration instance. + /// + /// + /// The ID of the orchestration instance. + /// + [JsonProperty("id")] + public string? Id { get; internal set; } + + /// + /// Gets the HTTP GET status query endpoint URL. + /// + /// + /// The HTTP URL for fetching the instance status. + /// + [JsonProperty("statusQueryGetUri")] + public string? StatusQueryGetUri { get; internal set; } + + /// + /// Gets the HTTP POST external event sending endpoint URL. + /// + /// + /// The HTTP URL for posting external event notifications. + /// + [JsonProperty("sendEventPostUri")] + public string? SendEventPostUri { get; internal set; } + + /// + /// Gets the HTTP POST instance termination endpoint. + /// + /// + /// The HTTP URL for posting instance termination commands. + /// + [JsonProperty("terminatePostUri")] + public string? TerminatePostUri { get; internal set; } + + /// + /// Gets the HTTP POST instance rewind endpoint. + /// + /// + /// The HTTP URL for rewinding orchestration instances. + /// + [JsonProperty("rewindPostUri")] + public string? RewindPostUri { get; internal set; } + + /// + /// Gets the HTTP DELETE purge instance history by instance ID endpoint. + /// + /// + /// The HTTP URL for purging instance history by instance ID. + /// + [JsonProperty("purgeHistoryDeleteUri")] + public string? PurgeHistoryDeleteUri { get; internal set; } + + /// + /// Gets the HTTP POST instance restart endpoint. + /// + /// + /// The HTTP URL for restarting an orchestration instance. + /// + [JsonProperty("restartPostUri")] + public string? RestartPostUri { get; internal set; } + + /// + /// Gets the HTTP POST instance suspend endpoint. + /// + /// + /// The HTTP URL for suspending an orchestration instance. + /// + [JsonProperty("suspendPostUri")] + public string? SuspendPostUri { get; internal set; } + + /// + /// Gets the HTTP POST instance resume endpoint. + /// + /// + /// The HTTP URL for resuming an orchestration instance. + /// + [JsonProperty("resumePostUri")] + public string? ResumePostUri { get; internal set; } +} diff --git a/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs b/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs index 5a335aefa..6f975d2c5 100644 --- a/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs +++ b/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs @@ -1,5 +1,5 @@ +using Microsoft.Azure.Functions.Worker.Http; using Microsoft.DurableTask.Client; -using Microsoft.DurableTask.Client.Grpc; using Moq; namespace Microsoft.Azure.Functions.Worker.Tests @@ -9,7 +9,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests /// public class FunctionsDurableTaskClientTests { - private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient() + private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient(string? baseUrl = null) { // construct mock client @@ -22,7 +22,7 @@ private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient() It.IsAny(), It.IsAny(), It.IsAny())).Returns(completedTask); DurableTaskClient durableClient = durableClientMock.Object; - FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null); + FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null, httpBaseUrl: baseUrl); return client; } @@ -53,5 +53,51 @@ public async void TerminateDoesNotThrow() await client.TerminateInstanceAsync(instanceId, options); await client.TerminateInstanceAsync(instanceId, options, token); } + + /// + /// Test that the `CreateHttpManagementPayload` method returns the expected payload structure without HttpRequestData. + /// + [Fact] + public void CreateHttpManagementPayload_WithBaseUrl() + { + const string BaseUrl = "http://localhost:7071/runtime/webhooks/durabletask"; + FunctionsDurableTaskClient client = this.GetTestFunctionsDurableTaskClient(BaseUrl); + string instanceId = "testInstanceIdWithHostBaseUrl"; + + HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId); + + AssertHttpManagementPayload(payload, BaseUrl, instanceId); + } + + /// + /// Test that the `CreateHttpManagementPayload` method returns the expected payload structure with HttpRequestData. + /// + [Fact] + public void CreateHttpManagementPayload_WithHttpRequestData() + { + const string requestUrl = "http://localhost:7075/orchestrators/E1_HelloSequence"; + FunctionsDurableTaskClient client = this.GetTestFunctionsDurableTaskClient(); + string instanceId = "testInstanceIdWithRequest"; + + // Create mock HttpRequestData object. + var mockFunctionContext = new Mock(); + var mockHttpRequestData = new Mock(mockFunctionContext.Object); + mockHttpRequestData.SetupGet(r => r.Url).Returns(new Uri(requestUrl)); + + HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId, mockHttpRequestData.Object); + + AssertHttpManagementPayload(payload, "http://localhost:7075/runtime/webhooks/durabletask", instanceId); + } + + private static void AssertHttpManagementPayload(HttpManagementPayload payload, string BaseUrl, string instanceId) + { + Assert.Equal(instanceId, payload.Id); + Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.PurgeHistoryDeleteUri); + Assert.Equal($"{BaseUrl}/instances/{instanceId}/raiseEvent/{{eventName}}", payload.SendEventPostUri); + Assert.Equal($"{BaseUrl}/instances/{instanceId}", payload.StatusQueryGetUri); + Assert.Equal($"{BaseUrl}/instances/{instanceId}/terminate?reason={{{{text}}}}", payload.TerminatePostUri); + Assert.Equal($"{BaseUrl}/instances/{instanceId}/suspend?reason={{{{text}}}}", payload.SuspendPostUri); + Assert.Equal($"{BaseUrl}/instances/{instanceId}/resume?reason={{{{text}}}}", payload.ResumePostUri); + } } -} \ No newline at end of file +} From 8470f3d2c1e165db0cf9cb082dc20adb46df79dc Mon Sep 17 00:00:00 2001 From: Konstantin Gukov Date: Tue, 29 Oct 2024 00:42:48 +0100 Subject: [PATCH 7/9] Fail fast if `ExtendedSessionsEnabled` is requested for a non-.NET worker. (#2732) While the code does emit the warning and overwrites the option value, this is too late. The frameworks reads the "old" option value before the options are validated and actually runs as if the extended sessions are on. This leads to the hard-to-troubleshoot problems, e.g. the python worker never re-triggers the orchestration and never completes them. PR also includes: * details why mssql test fails * `docker build --pull` to ensure the dockerfile is consistently built from the latest base images, no matter where it's built. --- release_notes.md | 2 ++ .../DurableTaskExtension.cs | 2 +- .../Options/DurableTaskOptions.cs | 11 +++----- test/Common/DurableTaskEndToEndTests.cs | 26 ++++++++++++------- test/SmokeTests/e2e-test.ps1 | 12 ++++++++- 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/release_notes.md b/release_notes.md index aa232c219..7333dc06a 100644 --- a/release_notes.md +++ b/release_notes.md @@ -4,6 +4,8 @@ ### New Features +- Fail fast if extendedSessionsEnabled set to 'true' for the worker type that doesn't support extended sessions (https://github.com/Azure/azure-functions-durable-extension/pull/2732). + ### Bug Fixes - Fix custom connection name not working when using IDurableClientFactory.CreateClient() - contributed by [@hctan](https://github.com/hctan) diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 937b236ea..4e59f460c 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -359,7 +359,7 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) } // Throw if any of the configured options are invalid - this.Options.Validate(this.nameResolver, this.TraceHelper); + this.Options.Validate(this.nameResolver); #pragma warning disable CS0618 // Type or member is obsolete diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs index bc555386f..c7322044e 100644 --- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs +++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs @@ -302,7 +302,7 @@ internal void TraceConfiguration(EndToEndTraceHelper traceHelper, JObject storag traceHelper.TraceConfiguration(this.HubName, configurationJson.ToString(Formatting.None)); } - internal void Validate(INameResolver environmentVariableResolver, EndToEndTraceHelper traceHelper) + internal void Validate(INameResolver environmentVariableResolver) { if (string.IsNullOrEmpty(this.HubName)) { @@ -320,12 +320,9 @@ internal void Validate(INameResolver environmentVariableResolver, EndToEndTraceH runtimeLanguage != null && // If we don't know from the environment variable, don't assume customer isn't .NET !string.Equals(runtimeLanguage, "dotnet", StringComparison.OrdinalIgnoreCase)) { - traceHelper.ExtensionWarningEvent( - hubName: this.HubName, - functionName: string.Empty, - instanceId: string.Empty, - message: "Durable Functions does not work with extendedSessions = true for non-.NET languages. This value is being set to false instead. See https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-perf-and-scale#extended-sessions for more details."); - this.ExtendedSessionsEnabled = false; + throw new InvalidOperationException( + "Durable Functions with extendedSessionsEnabled set to 'true' is only supported when using the in-process .NET worker. Please remove the setting or change it to 'false'." + + "See https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-perf-and-scale#extended-sessions for more details."); } this.Notifications.Validate(); diff --git a/test/Common/DurableTaskEndToEndTests.cs b/test/Common/DurableTaskEndToEndTests.cs index 489379cdd..0ba61d7f3 100644 --- a/test/Common/DurableTaskEndToEndTests.cs +++ b/test/Common/DurableTaskEndToEndTests.cs @@ -5592,16 +5592,24 @@ public async Task ExtendedSessions_OutOfProc_SetToFalse() { "FUNCTIONS_WORKER_RUNTIME", "node" }, }); - using (var host = TestHelpers.GetJobHostWithOptions( - this.loggerProvider, - durableTaskOptions, - nameResolver: nameResolver)) - { - await host.StartAsync(); - await host.StopAsync(); - } + InvalidOperationException exception = + await Assert.ThrowsAsync(async () => + { + using (var host = TestHelpers.GetJobHostWithOptions( + this.loggerProvider, + durableTaskOptions, + nameResolver: nameResolver)) + { + await host.StartAsync(); + await host.StopAsync(); + } + }); - Assert.False(durableTaskOptions.ExtendedSessionsEnabled); + Assert.NotNull(exception); + Assert.StartsWith( + "Durable Functions with extendedSessionsEnabled set to 'true' is only supported when using", + exception.Message, + StringComparison.OrdinalIgnoreCase); } [Fact] diff --git a/test/SmokeTests/e2e-test.ps1 b/test/SmokeTests/e2e-test.ps1 index 5eee1e0fd..ab918da8e 100644 --- a/test/SmokeTests/e2e-test.ps1 +++ b/test/SmokeTests/e2e-test.ps1 @@ -31,7 +31,7 @@ $AzuriteVersion = "3.32.0" if ($NoSetup -eq $false) { # Build the docker image first, since that's the most critical step Write-Host "Building sample app Docker container from '$DockerfilePath'..." -ForegroundColor Yellow - docker build -f $DockerfilePath -t $ImageName --progress plain $PSScriptRoot/../../ + docker build --pull -f $DockerfilePath -t $ImageName --progress plain $PSScriptRoot/../../ Exit-OnError # Next, download and start the Azurite emulator Docker image @@ -58,6 +58,16 @@ if ($NoSetup -eq $false) { Start-Sleep -Seconds 30 # Adjust the sleep duration based on your SQL Server container startup time Exit-OnError + Write-Host "Checking if SQL Server is still running..." -ForegroundColor Yellow + $sqlServerStatus = docker inspect -f '{{.State.Status}}' mssql-server + Exit-OnError + + if ($sqlServerStatus -ne "running") { + Write-Host "Unexpected SQL Server status: $sqlServerStatus" -ForegroundColor Yellow + docker logs mssql-server + exit 1; + } + # Get SQL Server IP Address - used to create SQLDB_Connection Write-Host "Getting IP Address..." -ForegroundColor Yellow $serverIpAddress = docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' mssql-server From 79e229542e3846b13456f0d9c8db6bdac860aebb Mon Sep 17 00:00:00 2001 From: Dixon T E Date: Wed, 6 Nov 2024 04:57:08 +1100 Subject: [PATCH 8/9] Add WaitForCompletionOrCreateCheckStatusResponseAsync to Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions (#2875) * Initial implementation of WaitForCompletionOrCreateCheckStatusResponseAsync * Support X-Forwarded-Host et al * Removed output of request headers used in my debugging * Set location header to include returnInternalServerErrorOnFailure=true if requested * update api and add unit test * update sortings * Remove unnecessary spaces * add back forword request handling and update test accordingly * update by comment * add summary * update test * remove x-original-forwarded as we shouldn't use this * default getinputsandoutputs to false * update test by comment --------- Co-authored-by: naiyuantian@microsoft.com --- .../DurableTaskClientExtensions.cs | 114 ++++++++- .../FunctionsDurableTaskClientTests.cs | 235 +++++++++++++++++- 2 files changed, 346 insertions(+), 3 deletions(-) diff --git a/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs b/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs index bbd6222a8..251ebb2d7 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -18,6 +19,70 @@ namespace Microsoft.Azure.Functions.Worker; /// public static class DurableTaskClientExtensions { + /// + /// Waits for the completion of the specified orchestration instance with a retry interval, controlled by the cancellation token. + /// If the orchestration does not complete within the required time, returns an HTTP response containing the class to manage instances. + /// + /// The . + /// The HTTP request that this response is for. + /// The ID of the orchestration instance to check. + /// The timeout between checks for output from the durable function. The default value is 1 second. + /// Optional parameter that configures the http response code returned. Defaults to false. + /// Optional parameter that configures whether to get the inputs and outputs of the orchestration. Defaults to false. + /// A token that signals if the wait should be canceled. If canceled, call CreateCheckStatusResponseAsync to return a reponse contains a HttpManagementPayload. + /// + public static async Task WaitForCompletionOrCreateCheckStatusResponseAsync( + this DurableTaskClient client, + HttpRequestData request, + string instanceId, + TimeSpan? retryInterval = null, + bool returnInternalServerErrorOnFailure = false, + bool getInputsAndOutputs = false, + CancellationToken cancellation = default + ) + { + TimeSpan retryIntervalLocal = retryInterval ?? TimeSpan.FromSeconds(1); + try + { + while (true) + { + var status = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: getInputsAndOutputs); + if (status != null) + { + if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed || +#pragma warning disable CS0618 // Type or member is obsolete + status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled || +#pragma warning restore CS0618 // Type or member is obsolete + status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated || + status.RuntimeStatus == OrchestrationRuntimeStatus.Failed) + { + var response = request.CreateResponse( + (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed && returnInternalServerErrorOnFailure) ? HttpStatusCode.InternalServerError : HttpStatusCode.OK); + await response.WriteAsJsonAsync(new + { + Name = status.Name, + InstanceId = status.InstanceId, + CreatedAt = status.CreatedAt, + LastUpdatedAt = status.LastUpdatedAt, + RuntimeStatus = status.RuntimeStatus.ToString(), // Convert enum to string + SerializedInput = status.SerializedInput, + SerializedOutput = status.SerializedOutput, + SerializedCustomStatus = status.SerializedCustomStatus + }, statusCode: response.StatusCode); + + return response; + } + } + await Task.Delay(retryIntervalLocal, cancellation); + } + } + // If the task is canceled, call CreateCheckStatusResponseAsync to return a response containing instance management URLs. + catch (OperationCanceledException) + { + return await CreateCheckStatusResponseAsync(client, request, instanceId); + } + } + /// /// Creates an HTTP response that is useful for checking the status of the specified instance. /// @@ -170,13 +235,13 @@ static string BuildUrl(string url, params string?[] queryValues) // The base URL could be null if: // 1. The DurableTaskClient isn't a FunctionsDurableTaskClient (which would have the baseUrl from bindings) // 2. There's no valid HttpRequestData provided - string? baseUrl = ((request != null) ? request.Url.GetLeftPart(UriPartial.Authority) : GetBaseUrl(client)); + string? baseUrl = ((request != null) ? GetBaseUrlFromRequest(request) : GetBaseUrl(client)); if (baseUrl == null) { throw new InvalidOperationException("Failed to create HTTP management payload as base URL is null. Either use Functions bindings or provide an HTTP request to create the HttpPayload."); } - + bool isFromRequest = request != null; string formattedInstanceId = Uri.EscapeDataString(instanceId); @@ -214,6 +279,51 @@ private static ObjectSerializer GetObjectSerializer(HttpResponseData response) ?? throw new InvalidOperationException("A serializer is not configured for the worker."); } + private static string? GetBaseUrlFromRequest(HttpRequestData request) + { + // Default to the scheme from the request URL + string proto = request.Url.Scheme; + string host = request.Url.Authority; + + // Check for "Forwarded" header + if (request.Headers.TryGetValues("Forwarded", out var forwardedHeaders)) + { + var forwardedDict = forwardedHeaders.FirstOrDefault()?.Split(';') + .Select(pair => pair.Split('=')) + .Where(pair => pair.Length == 2) + .ToDictionary(pair => pair[0].Trim(), pair => pair[1].Trim()); + + if (forwardedDict != null) + { + if (forwardedDict.TryGetValue("proto", out var forwardedProto)) + { + proto = forwardedProto; + } + if (forwardedDict.TryGetValue("host", out var forwardedHost)) + { + host = forwardedHost; + // Return if either proto or host (or both) were found in "Forwarded" header + return $"{proto}://{forwardedHost}"; + } + } + } + // Check for "X-Forwarded-Proto" and "X-Forwarded-Host" headers if "Forwarded" is not present + if (request.Headers.TryGetValues("X-Forwarded-Proto", out var protos)) + { + proto = protos.FirstOrDefault() ?? proto; + } + if (request.Headers.TryGetValues("X-Forwarded-Host", out var hosts)) + { + // Return base URL if either "X-Forwarded-Proto" or "X-Forwarded-Host" (or both) are found + host = hosts.FirstOrDefault() ?? host; + return $"{proto}://{host}"; + } + + // Construct and return the base URL from default fallback values + return $"{proto}://{host}"; + } + + private static string? GetQueryParams(DurableTaskClient client) { return client is FunctionsDurableTaskClient functions ? functions.QueryString : null; diff --git a/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs b/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs index 6f975d2c5..1623f4559 100644 --- a/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs +++ b/test/Worker.Extensions.DurableTask.Tests/FunctionsDurableTaskClientTests.cs @@ -1,6 +1,10 @@ +using System.Net; +using Azure.Core.Serialization; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.DurableTask.Client; +using Microsoft.Extensions.Options; using Moq; +using Newtonsoft.Json; namespace Microsoft.Azure.Functions.Worker.Tests { @@ -9,7 +13,7 @@ namespace Microsoft.Azure.Functions.Worker.Tests /// public class FunctionsDurableTaskClientTests { - private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient(string? baseUrl = null) + private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient(string? baseUrl = null, OrchestrationMetadata? orchestrationMetadata = null) { // construct mock client @@ -21,6 +25,12 @@ private FunctionsDurableTaskClient GetTestFunctionsDurableTaskClient(string? bas durableClientMock.Setup(x => x.TerminateInstanceAsync( It.IsAny(), It.IsAny(), It.IsAny())).Returns(completedTask); + if (orchestrationMetadata != null) + { + durableClientMock.Setup(x => x.GetInstancesAsync(orchestrationMetadata.InstanceId, It.IsAny(), It.IsAny())) + .ReturnsAsync(orchestrationMetadata); + } + DurableTaskClient durableClient = durableClientMock.Object; FunctionsDurableTaskClient client = new FunctionsDurableTaskClient(durableClient, queryString: null, httpBaseUrl: baseUrl); return client; @@ -82,6 +92,8 @@ public void CreateHttpManagementPayload_WithHttpRequestData() // Create mock HttpRequestData object. var mockFunctionContext = new Mock(); var mockHttpRequestData = new Mock(mockFunctionContext.Object); + var headers = new HttpHeadersCollection(); + mockHttpRequestData.SetupGet(r => r.Headers).Returns(headers); mockHttpRequestData.SetupGet(r => r.Url).Returns(new Uri(requestUrl)); HttpManagementPayload payload = client.CreateHttpManagementPayload(instanceId, mockHttpRequestData.Object); @@ -89,6 +101,153 @@ public void CreateHttpManagementPayload_WithHttpRequestData() AssertHttpManagementPayload(payload, "http://localhost:7075/runtime/webhooks/durabletask", instanceId); } + /// + /// Test that the `WaitForCompletionOrCreateCheckStatusResponseAsync` method returns the expected response when the orchestration is completed. + /// The expected response should include OrchestrationMetadata in the body with an HttpStatusCode.OK. + /// + [Fact] + public async Task TestWaitForCompletionOrCreateCheckStatusResponseAsync_WhenCompleted() + { + string instanceId = "test-instance-id-completed"; + var expectedResult = new OrchestrationMetadata("TestCompleted", instanceId) + { + CreatedAt = DateTime.UtcNow, + LastUpdatedAt = DateTime.UtcNow, + RuntimeStatus = OrchestrationRuntimeStatus.Completed, + SerializedCustomStatus = "TestCustomStatus", + SerializedInput = "TestInput", + SerializedOutput = "TestOutput" + }; + + var client = this.GetTestFunctionsDurableTaskClient( orchestrationMetadata: expectedResult); + + HttpRequestData request = this.MockHttpRequestAndResponseData(); + + HttpResponseData response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, instanceId); + + Assert.NotNull(response); + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + + // Reset stream position for reading + response.Body.Position = 0; + var orchestratorMetadata = await System.Text.Json.JsonSerializer.DeserializeAsync(response.Body); + + // Assert the response content is not null and check the content is correct. + Assert.NotNull(orchestratorMetadata); + AssertOrhcestrationMetadata(expectedResult, orchestratorMetadata); + } + + /// + /// Test that the `WaitForCompletionOrCreateCheckStatusResponseAsync` method returns expected response when the orchestrator didn't finish within + /// the timeout period. The response body should contain a HttpManagementPayload with HttpStatusCode.Accepted. + /// + [Fact] + public async Task TestWaitForCompletionOrCreateCheckStatusResponseAsync_WhenRunning() + { + string instanceId = "test-instance-id-running"; + var expectedResult = new OrchestrationMetadata("TestRunning", instanceId) + { + CreatedAt = DateTime.UtcNow, + LastUpdatedAt = DateTime.UtcNow, + RuntimeStatus = OrchestrationRuntimeStatus.Running, + }; + + var client = this.GetTestFunctionsDurableTaskClient(orchestrationMetadata: expectedResult); + + HttpRequestData request = this.MockHttpRequestAndResponseData(); + HttpResponseData response; + using (CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))) + { + response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, instanceId, cancellation: cts.Token); + }; + + Assert.NotNull(response); + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + + // Reset stream position for reading + response.Body.Position = 0; + HttpManagementPayload? payload; + using (var reader = new StreamReader(response.Body)) + { + payload = JsonConvert.DeserializeObject(await reader.ReadToEndAsync()); + } + + // Assert the response content is not null and check the content is correct. + Assert.NotNull(payload); + AssertHttpManagementPayload(payload, "https://localhost:7075/runtime/webhooks/durabletask", instanceId); + } + + /// + /// Tests the `WaitForCompletionOrCreateCheckStatusResponseAsync` method to ensure it returns the correct HTTP status code + /// based on the `returnInternalServerErrorOnFailure` parameter when the orchestration has failed. + /// + [Theory] + [InlineData(true, HttpStatusCode.InternalServerError)] + [InlineData(false, HttpStatusCode.OK)] + public async Task TestWaitForCompletionOrCreateCheckStatusResponseAsync_WhenFailed(bool returnInternalServerErrorOnFailure, HttpStatusCode expected) + { + string instanceId = "test-instance-id-failed"; + var expectedResult = new OrchestrationMetadata("TestFailed", instanceId) + { + CreatedAt = DateTime.UtcNow, + LastUpdatedAt = DateTime.UtcNow, + RuntimeStatus = OrchestrationRuntimeStatus.Failed, + SerializedOutput = "Microsoft.DurableTask.TaskFailedException: Task 'SayHello' (#0) failed with an unhandled exception: Exception while executing function: Functions.SayHello", + SerializedInput = null + }; + + var client = this.GetTestFunctionsDurableTaskClient(orchestrationMetadata: expectedResult); + + HttpRequestData request = this.MockHttpRequestAndResponseData(); + + HttpResponseData response = await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, instanceId, returnInternalServerErrorOnFailure: returnInternalServerErrorOnFailure); + + Assert.NotNull(response); + Assert.Equal(expected, response.StatusCode); + + // Reset stream position for reading + response.Body.Position = 0; + var orchestratorMetadata = await System.Text.Json.JsonSerializer.DeserializeAsync(response.Body); + + // Assert the response content is not null and check the content is correct. + Assert.NotNull(orchestratorMetadata); + AssertOrhcestrationMetadata(expectedResult, orchestratorMetadata); + } + + /// + /// Tests the `GetBaseUrlFromRequest` can return the right base URL from the HttpRequestData with different forwarding or proxies. + /// This test covers the following scenarios: + /// - Using the "Forwarded" header + /// - Using "X-Forwarded-Proto" and "X-Forwarded-Host" headers + /// - Using only "X-Forwarded-Host" with default protocol + /// - no headers + /// + [Theory] + [InlineData("Forwarded", "proto=https;host=forwarded.example.com","","", "https://forwarded.example.com/runtime/webhooks/durabletask")] + [InlineData("X-Forwarded-Proto", "https", "X-Forwarded-Host", "xforwarded.example.com", "https://xforwarded.example.com/runtime/webhooks/durabletask")] + [InlineData("", "", "X-Forwarded-Host", "test.net", "https://test.net/runtime/webhooks/durabletask")] + [InlineData("", "", "", "", "https://localhost:7075/runtime/webhooks/durabletask")] // Default base URL for empty headers + public void TestHttpRequestDataForwardingHandling(string header1, string? value1, string header2, string value2, string expectedBaseUrl) + { + var headers = new HttpHeadersCollection(); + if (!string.IsNullOrEmpty(header1)) + { + headers.Add(header1, value1); + } + if (!string.IsNullOrEmpty(header2)) + { + headers.Add(header2, value2); + } + + var request = this.MockHttpRequestAndResponseData(headers); + var client = this.GetTestFunctionsDurableTaskClient(); + + var payload = client.CreateHttpManagementPayload("testInstanceId", request); + AssertHttpManagementPayload(payload, expectedBaseUrl, "testInstanceId"); + } + + + private static void AssertHttpManagementPayload(HttpManagementPayload payload, string BaseUrl, string instanceId) { Assert.Equal(instanceId, payload.Id); @@ -99,5 +258,79 @@ private static void AssertHttpManagementPayload(HttpManagementPayload payload, s Assert.Equal($"{BaseUrl}/instances/{instanceId}/suspend?reason={{{{text}}}}", payload.SuspendPostUri); Assert.Equal($"{BaseUrl}/instances/{instanceId}/resume?reason={{{{text}}}}", payload.ResumePostUri); } + + private static void AssertOrhcestrationMetadata(OrchestrationMetadata expectedResult, dynamic actualResult) + { + Assert.Equal(expectedResult.Name, actualResult.GetProperty("Name").GetString()); + Assert.Equal(expectedResult.InstanceId, actualResult.GetProperty("InstanceId").GetString()); + Assert.Equal(expectedResult.CreatedAt, actualResult.GetProperty("CreatedAt").GetDateTime()); + Assert.Equal(expectedResult.LastUpdatedAt, actualResult.GetProperty("LastUpdatedAt").GetDateTime()); + Assert.Equal(expectedResult.RuntimeStatus.ToString(), actualResult.GetProperty("RuntimeStatus").GetString()); + Assert.Equal(expectedResult.SerializedInput, actualResult.GetProperty("SerializedInput").GetString()); + Assert.Equal(expectedResult.SerializedOutput, actualResult.GetProperty("SerializedOutput").GetString()); + Assert.Equal(expectedResult.SerializedCustomStatus, actualResult.GetProperty("SerializedCustomStatus").GetString()); + } + + // Mocks the required HttpRequestData and HttpResponseData for testing purposes. + // This method sets up a mock HttpRequestData with a predefined URL and a mock HttpResponseDatav with a default status code and body. + // The headers of HttpRequestData can be provided as an optional parameter, otherwise an empty HttpHeadersCollection is used. + private HttpRequestData MockHttpRequestAndResponseData(HttpHeadersCollection? headers = null) + { + var mockObjectSerializer = new Mock(); + + // Setup the SerializeAsync method + mockObjectSerializer.Setup(s => s.SerializeAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (stream, value, type, token) => + { + await System.Text.Json.JsonSerializer.SerializeAsync(stream, value, type, cancellationToken: token); + }); + + var workerOptions = new WorkerOptions + { + Serializer = mockObjectSerializer.Object + }; + var mockOptions = new Mock>(); + mockOptions.Setup(o => o.Value).Returns(workerOptions); + + // Mock the service provider + var mockServiceProvider = new Mock(); + + // Set up the service provider to return the mock IOptions + mockServiceProvider.Setup(sp => sp.GetService(typeof(IOptions))) + .Returns(mockOptions.Object); + + // Set up the service provider to return the mock ObjectSerializer + mockServiceProvider.Setup(sp => sp.GetService(typeof(ObjectSerializer))) + .Returns(mockObjectSerializer.Object); + + // Create a mock FunctionContext and assign the service provider + var mockFunctionContext = new Mock(); + mockFunctionContext.SetupGet(c => c.InstanceServices).Returns(mockServiceProvider.Object); + var mockHttpRequestData = new Mock(mockFunctionContext.Object); + + // Set up the URL property. + mockHttpRequestData.SetupGet(r => r.Url).Returns(new Uri("https://localhost:7075/orchestrators/E1_HelloSequence")); + + // If headers are provided, use them, otherwise create a new empty HttpHeadersCollection + headers ??= new HttpHeadersCollection(); + + // Setup the Headers property to return the empty headers + mockHttpRequestData.SetupGet(r => r.Headers).Returns(headers); + + var mockHttpResponseData = new Mock(mockFunctionContext.Object) + { + DefaultValue = DefaultValue.Mock + }; + + // Enable setting StatusCode and Body as mutable properties + mockHttpResponseData.SetupProperty(r => r.StatusCode, HttpStatusCode.OK); + mockHttpResponseData.SetupProperty(r => r.Body, new MemoryStream()); + + // Setup CreateResponse to return the configured HttpResponseData mock + mockHttpRequestData.Setup(r => r.CreateResponse()) + .Returns(mockHttpResponseData.Object); + + return mockHttpRequestData.Object; + } } } From aa30752b488205f80e22e805e54c6280750a5ec1 Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Fri, 8 Nov 2024 11:51:10 -0800 Subject: [PATCH 9/9] Make durable client registration idempotent. (#2950) --- release_notes.md | 2 + .../DurableTaskExtensionStartup.cs | 57 +------- ...tionsWorkerApplicationBuilderExtensions.cs | 125 ++++++++++++++++++ 3 files changed, 128 insertions(+), 56 deletions(-) create mode 100644 src/Worker.Extensions.DurableTask/FunctionsWorkerApplicationBuilderExtensions.cs diff --git a/release_notes.md b/release_notes.md index 7333dc06a..3babbb06b 100644 --- a/release_notes.md +++ b/release_notes.md @@ -5,10 +5,12 @@ ### New Features - Fail fast if extendedSessionsEnabled set to 'true' for the worker type that doesn't support extended sessions (https://github.com/Azure/azure-functions-durable-extension/pull/2732). +- Added an `IFunctionsWorkerApplicationBuilder.ConfigureDurableExtension()` extension method for cases where auto-registration does not work (no source gen running). (#2950) ### Bug Fixes - Fix custom connection name not working when using IDurableClientFactory.CreateClient() - contributed by [@hctan](https://github.com/hctan) +- Made durable extension for isolated worker configuration idempotent, allowing multiple calls safely. (#2950) ### Breaking Changes diff --git a/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs b/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs index 626acd6bf..af7bab017 100644 --- a/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs +++ b/src/Worker.Extensions.DurableTask/DurableTaskExtensionStartup.cs @@ -1,20 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; -using Azure.Core.Serialization; using Microsoft.Azure.Functions.Worker.Core; using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; -using Microsoft.DurableTask; -using Microsoft.DurableTask.Client; -using Microsoft.DurableTask.Converters; -using Microsoft.DurableTask.Worker; -using Microsoft.DurableTask.Worker.Shims; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; [assembly: WorkerExtensionStartup(typeof(DurableTaskExtensionStartup))] @@ -28,49 +16,6 @@ public sealed class DurableTaskExtensionStartup : WorkerExtensionStartup /// public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder) { - applicationBuilder.Services.AddSingleton(); - applicationBuilder.Services.AddOptions() - .Configure(options => options.EnableEntitySupport = true) - .PostConfigure((opt, sp) => - { - if (GetConverter(sp) is DataConverter converter) - { - opt.DataConverter = converter; - } - }); - - applicationBuilder.Services.AddOptions() - .Configure(options => options.EnableEntitySupport = true) - .PostConfigure((opt, sp) => - { - if (GetConverter(sp) is DataConverter converter) - { - opt.DataConverter = converter; - } - }); - - applicationBuilder.Services.TryAddSingleton(sp => - { - DurableTaskWorkerOptions options = sp.GetRequiredService>().Value; - ILoggerFactory factory = sp.GetRequiredService(); - return new DurableTaskShimFactory(options, factory); // For GrpcOrchestrationRunner - }); - - applicationBuilder.Services.Configure(o => - { - o.InputConverters.Register(); - }); - - applicationBuilder.UseMiddleware(); - } - - private static DataConverter? GetConverter(IServiceProvider services) - { - // We intentionally do not consider a DataConverter in the DI provider, or if one was already set. This is to - // ensure serialization is consistent with the rest of Azure Functions. This is particularly important because - // TaskActivity bindings use ObjectSerializer directly for the time being. Due to this, allowing DataConverter - // to be set separately from ObjectSerializer would give an inconsistent serialization solution. - WorkerOptions? worker = services.GetRequiredService>()?.Value; - return worker?.Serializer is not null ? new ObjectConverterShim(worker.Serializer) : null; + applicationBuilder.ConfigureDurableExtension(); } } diff --git a/src/Worker.Extensions.DurableTask/FunctionsWorkerApplicationBuilderExtensions.cs b/src/Worker.Extensions.DurableTask/FunctionsWorkerApplicationBuilderExtensions.cs new file mode 100644 index 000000000..642446dd4 --- /dev/null +++ b/src/Worker.Extensions.DurableTask/FunctionsWorkerApplicationBuilderExtensions.cs @@ -0,0 +1,125 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Linq; +using Azure.Core.Serialization; +using Microsoft.Azure.Functions.Worker.Core; +using Microsoft.Azure.Functions.Worker.Extensions.DurableTask; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Converters; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Shims; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Microsoft.Azure.Functions.Worker; + +/// +/// Extensions for . +/// +public static class FunctionsWorkerApplicationBuilderExtensions +{ + /// + /// Configures the Durable Functions extension for the worker. + /// + /// The builder to configure. + /// The for call chaining. + public static IFunctionsWorkerApplicationBuilder ConfigureDurableExtension(this IFunctionsWorkerApplicationBuilder builder) + { + if (builder is null) + { + throw new ArgumentNullException(nameof(builder)); + } + + builder.Services.TryAddSingleton(); + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton, ConfigureClientOptions>()); + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton, PostConfigureClientOptions>()); + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton, ConfigureWorkerOptions>()); + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton, PostConfigureWorkerOptions>()); + + builder.Services.TryAddSingleton(sp => + { + DurableTaskWorkerOptions options = sp.GetRequiredService>().Value; + ILoggerFactory factory = sp.GetRequiredService(); + return new DurableTaskShimFactory(options, factory); // For GrpcOrchestrationRunner + }); + + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton, ConfigureInputConverter>()); + if (!builder.Services.Any(d => d.ServiceType == typeof(DurableTaskFunctionsMiddleware))) + { + builder.UseMiddleware(); + } + + return builder; + } + + private class ConfigureInputConverter : IConfigureOptions + { + public void Configure(WorkerOptions options) + { + options.InputConverters.Register(); + } + } + + private class ConfigureClientOptions : IConfigureOptions + { + public void Configure(DurableTaskClientOptions options) + { + options.EnableEntitySupport = true; + } + } + + private class PostConfigureClientOptions : IPostConfigureOptions + { + readonly IOptionsMonitor workerOptions; + + public PostConfigureClientOptions(IOptionsMonitor workerOptions) + { + this.workerOptions = workerOptions; + } + + public void PostConfigure(string name, DurableTaskClientOptions options) + { + if (this.workerOptions.Get(name).Serializer is { } serializer) + { + options.DataConverter = new ObjectConverterShim(serializer); + } + } + } + + private class ConfigureWorkerOptions : IConfigureOptions + { + public void Configure(DurableTaskWorkerOptions options) + { + options.EnableEntitySupport = true; + } + } + + private class PostConfigureWorkerOptions : IPostConfigureOptions + { + readonly IOptionsMonitor workerOptions; + + public PostConfigureWorkerOptions(IOptionsMonitor workerOptions) + { + this.workerOptions = workerOptions; + } + + public void PostConfigure(string name, DurableTaskWorkerOptions options) + { + if (this.workerOptions.Get(name).Serializer is { } serializer) + { + options.DataConverter = new ObjectConverterShim(serializer); + } + } + } +}