From eb1852ff349f22c53066e93fe3dc5198be7ebf1a Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 24 Aug 2023 13:25:59 -0700
Subject: [PATCH 01/15] wip
---
.../src/ISettlement.cs | 6 ++++++
.../src/Proto/settlement.proto | 19 +++++++++++++++++++
.../src/ServiceBusMessageActions.cs | 6 ++++++
.../src/ServiceBusMessageActionsConverter.cs | 6 ++++++
.../src/SettlementImpl.cs | 6 ++++++
.../src/Startup.cs | 6 ++++++
6 files changed, 49 insertions(+)
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/Startup.cs
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs b/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
new file mode 100644
index 000000000..113e5c401
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
@@ -0,0 +1,6 @@
+namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+
+public class ISettlement
+{
+
+}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
new file mode 100644
index 000000000..cc1dd5aeb
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
@@ -0,0 +1,19 @@
+syntax = "proto3";
+
+option csharp_namespace = "Samples.Extensions.Rpc";
+
+// The settlement service definition.
+service Settlement {
+ // Sends a greeting
+ rpc Complete (CompleteRequest) returns (CompleteReply) {}
+}
+
+// The request message containing the locktoken.
+message CompleteRequest {
+ string locktoken = 1;
+}
+
+// The response message containing any exceptions
+message CompleteReply {
+ string exception = 1;
+}
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
new file mode 100644
index 000000000..362b147da
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -0,0 +1,6 @@
+namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+
+public class ServiceBusMessageActions
+{
+
+}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
new file mode 100644
index 000000000..c52a35b61
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
@@ -0,0 +1,6 @@
+namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+
+public class ServiceBusMessageActionsConverter
+{
+
+}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs b/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
new file mode 100644
index 000000000..81b3f96ee
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
@@ -0,0 +1,6 @@
+namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+
+public class SettlementImpl
+{
+
+}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
new file mode 100644
index 000000000..55e816934
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
@@ -0,0 +1,6 @@
+namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+
+public class Startup
+{
+
+}
\ No newline at end of file
From 5f692ea3ebfe322e14281a77f60fb54d90db2246 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Thu, 24 Aug 2023 13:28:32 -0700
Subject: [PATCH 02/15] WIP
---
NuGet.Config | 1 +
.../src/ISettlement.cs | 13 ++++--
.../src/Proto/settlement.proto | 2 +-
.../src/ServiceBusMessageActions.cs | 41 +++++++++++++++++--
.../src/ServiceBusMessageActionsConverter.cs | 34 +++++++++++++--
.../src/SettlementImpl.cs | 28 +++++++++++--
.../src/Startup.cs | 28 +++++++++++--
.../src/Worker.Extensions.ServiceBus.csproj | 21 +++++++++-
8 files changed, 151 insertions(+), 17 deletions(-)
diff --git a/NuGet.Config b/NuGet.Config
index 238ff8b97..65ee1715a 100644
--- a/NuGet.Config
+++ b/NuGet.Config
@@ -4,5 +4,6 @@
+
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs b/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
index 113e5c401..1bdc1b655 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
@@ -1,6 +1,13 @@
-namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+// Copyright (c) Jacob Viau. All rights reserved.
+// Licensed under the MIT. See LICENSE file in the project root for full license information.
-public class ISettlement
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Azure.Functions.Worker
{
-
+ internal interface ISettlement
+ {
+ Task CompleteAsync(string lockToken, CancellationToken cancellation = default);
+ }
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
index cc1dd5aeb..e09b422f5 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
+++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
@@ -1,6 +1,6 @@
syntax = "proto3";
-option csharp_namespace = "Samples.Extensions.Rpc";
+option csharp_namespace = "Microsoft.Azure.Functions.Worker";
// The settlement service definition.
service Settlement {
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
index 362b147da..f5eea8804 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -1,6 +1,41 @@
-namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus;
+using Grpc.Core;
+using Grpc.Core.Logging;
+using Microsoft.Azure.Functions.Worker.Converters;
-public class ServiceBusMessageActions
+namespace Microsoft.Azure.Functions.Worker
{
-
+ [InputConverter(typeof(ServiceBusMessageActionsConverter))]
+ public class ServiceBusMessageActions
+ {
+ private readonly Settlement.SettlementClient _settlement;
+
+ // internal ServiceBusMessageActions(ISettlement settlement)
+ // {
+ // _settlement = settlement;
+ // }
+
+ // public ServiceBusMessageActions()
+ // {
+ // _settlement = new Settlement.SettlementClient();
+ // }
+
+ internal ServiceBusMessageActions(Settlement.SettlementClient settlement)
+ {
+ _settlement = settlement;
+ }
+
+ ///
+ public virtual async Task CompleteMessageAsync(
+ ServiceBusReceivedMessage message,
+ CancellationToken cancellationToken = default)
+ {
+ // _logger.LogInformation("Completing message with lock token {LockToken}", lockToken);
+ CompleteReply reply = await _settlement.CompleteAsync(new() { Locktoken = message.LockToken}, cancellationToken: cancellationToken);
+ // return reply;
+ // await _settlement.CompleteAsync(message.LockToken, cancellationToken);
+ }
+ }
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
index c52a35b61..504d34f99 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
@@ -1,6 +1,34 @@
-namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
-public class ServiceBusMessageActionsConverter
+using System;
+using System.Threading.Tasks;
+using Microsoft.Azure.Functions.Worker.Converters;
+
+namespace Microsoft.Azure.Functions.Worker
{
-
+ ///
+ /// Converter to bind to type parameter.
+ ///
+ internal class ServiceBusMessageActionsConverter : IInputConverter
+ {
+ private readonly Settlement.SettlementClient _settlement;
+
+ public ServiceBusMessageActionsConverter(Settlement.SettlementClient settlement)
+ {
+ _settlement = settlement;
+ }
+
+ public ValueTask ConvertAsync(ConverterContext context)
+ {
+ try
+ {
+ return new ValueTask(ConversionResult.Success(new ServiceBusMessageActions(_settlement)));
+ }
+ catch (Exception exception)
+ {
+ return new ValueTask(ConversionResult.Failed(exception));
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs b/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
index 81b3f96ee..4048f15ed 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
@@ -1,6 +1,28 @@
-namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+// Copyright (c) Jacob Viau. All rights reserved.
+// Licensed under the MIT. See LICENSE file in the project root for full license information.
-public class SettlementImpl
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Microsoft.Azure.Functions.Worker
{
-
+ internal class SettlementImpl : ISettlement
+ {
+ private readonly ILogger _logger;
+ private readonly Settlement.SettlementClient _client;
+
+ public SettlementImpl(Settlement.SettlementClient client, ILogger logger)
+ {
+ _client = client;
+ _logger = logger;
+ }
+
+ public async Task CompleteAsync(string lockToken, CancellationToken cancellation = default)
+ {
+ _logger.LogInformation("Completing message with lock token {LockToken}", lockToken);
+ CompleteReply reply = await _client.CompleteAsync(new() { Locktoken = lockToken}, cancellationToken: cancellation);
+ return reply;
+ }
+ }
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
index 55e816934..0b79aec72 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
@@ -1,6 +1,28 @@
-namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;
+// Copyright (c) Jacob Viau. All rights reserved.
+// Licensed under the MIT. See LICENSE file in the project root for full license information.
-public class Startup
+using Microsoft.Azure.Functions.Worker;
+using Microsoft.Azure.Functions.Worker.Core;
+using Microsoft.Azure.Functions.Worker.Extensions.Rpc;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using Samples.Extensions.Rpc.Worker;
+
+[assembly: WorkerExtensionStartup(typeof(Startup))]
+
+namespace Samples.Extensions.Rpc.Worker;
+
+public sealed class Startup : WorkerExtensionStartup
{
-
+ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder)
+ {
+ applicationBuilder.Services.AddTransient(sp =>
+ {
+ IOptions options = sp.GetRequiredService>();
+ return new Settlement.SettlementClient(options.Value.CallInvoker);
+ });
+
+ // applicationBuilder.Services.AddSingleton();
+ // applicationBuilder.Services.AddTransient();
+ }
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index e8ac46737..0556a6dc1 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -6,7 +6,7 @@
Azure Service Bus extensions for .NET isolated functions
- 5.11.0
+ 5.13.0
false
@@ -14,8 +14,27 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
From c8488c1655180884ac2e998af37d55ca0a828965 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 20 Sep 2023 09:31:45 -0700
Subject: [PATCH 03/15] save
---
.../src/Worker.Extensions.ServiceBus.csproj | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 0556a6dc1..9d5faf769 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -4,6 +4,7 @@
Microsoft.Azure.Functions.Worker.Extensions.ServiceBus
Microsoft.Azure.Functions.Worker.Extensions.ServiceBus
Azure Service Bus extensions for .NET isolated functions
+ true
5.13.0
@@ -17,16 +18,14 @@
-
-
-
-
+
+
+
-
-
+
From 3ac66560750fb14606de67c1030394b7bcdd78bd Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 4 Oct 2023 13:00:01 -0700
Subject: [PATCH 04/15] Add all settlement actions
---
.../src/ISettlement.cs | 13 --
.../src/Properties/AssemblyInfo.cs | 6 +-
.../src/Proto/settlement.proto | 61 ++++++-
.../src/ServiceBusMessageActions.cs | 159 ++++++++++++++++--
.../src/ServiceBusMessageActionsConverter.cs | 1 +
.../src/SettlementImpl.cs | 28 ---
.../src/Startup.cs | 4 +-
7 files changed, 203 insertions(+), 69 deletions(-)
delete mode 100644 extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
delete mode 100644 extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs b/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
deleted file mode 100644
index 1bdc1b655..000000000
--- a/extensions/Worker.Extensions.ServiceBus/src/ISettlement.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-// Copyright (c) Jacob Viau. All rights reserved.
-// Licensed under the MIT. See LICENSE file in the project root for full license information.
-
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace Microsoft.Azure.Functions.Worker
-{
- internal interface ISettlement
- {
- Task CompleteAsync(string lockToken, CancellationToken cancellation = default);
- }
-}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
index 859d6dafd..441901b51 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
@@ -1,6 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
-using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
+using System.Runtime.CompilerServices;
+using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
-[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.11.0")]
+[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.0-alpha.20231004.1")]
+[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.WorkerExtension.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
index e09b422f5..62fbc40ba 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
+++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
@@ -1,19 +1,66 @@
syntax = "proto3";
-option csharp_namespace = "Microsoft.Azure.Functions.Worker";
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/empty.proto";
+
+// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
+option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
// The settlement service definition.
service Settlement {
- // Sends a greeting
- rpc Complete (CompleteRequest) returns (CompleteReply) {}
+ // Completes a message
+ rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {}
+
+ // Abandons a message
+ rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {}
+
+ // Deadletters a message
+ rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {}
+
+ // Defers a message
+ rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}
}
-// The request message containing the locktoken.
+// The complete message request containing the locktoken.
message CompleteRequest {
string locktoken = 1;
}
-// The response message containing any exceptions
-message CompleteReply {
- string exception = 1;
+// The abandon message request containing the locktoken and properties to modify.
+message AbandonRequest {
+ string locktoken = 1;
+ map propertiesToModify = 2;
+}
+
+// The deadletter message request containing the locktoken and properties to modify along with the reason/description.
+message DeadletterRequest {
+ string locktoken = 1;
+ map propertiesToModify = 2;
+ optional string deadletterReason = 3;
+ optional string deadletterErrorDescription = 4;
+}
+
+// The defer message request containing the locktoken and properties to modify.
+message DeferRequest {
+ string locktoken = 1;
+ map propertiesToModify = 2;
+}
+
+// The settlement property can be of any type listed below, which
+// corresponds to the types specified in
+// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
+// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
+// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
+message SettlementProperties {
+ oneof values {
+ string stringValue = 1;
+ int32 intValue = 2;
+ uint32 uintValue = 3;
+ int64 longValue = 4;
+ uint64 ulongValue = 5;
+ bool boolValue = 6;
+ float floatValue = 7;
+ double doubleValue = 8;
+ google.protobuf.Timestamp timestampValue = 9;
+ }
}
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
index f5eea8804..a477d345d 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -1,9 +1,11 @@
+using System;
+using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
-using Grpc.Core;
-using Grpc.Core.Logging;
+using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.Functions.Worker.Converters;
+using Microsoft.Azure.ServiceBus.Grpc;
namespace Microsoft.Azure.Functions.Worker
{
@@ -12,16 +14,6 @@ public class ServiceBusMessageActions
{
private readonly Settlement.SettlementClient _settlement;
- // internal ServiceBusMessageActions(ISettlement settlement)
- // {
- // _settlement = settlement;
- // }
-
- // public ServiceBusMessageActions()
- // {
- // _settlement = new Settlement.SettlementClient();
- // }
-
internal ServiceBusMessageActions(Settlement.SettlementClient settlement)
{
_settlement = settlement;
@@ -32,10 +24,145 @@ public virtual async Task CompleteMessageAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
- // _logger.LogInformation("Completing message with lock token {LockToken}", lockToken);
- CompleteReply reply = await _settlement.CompleteAsync(new() { Locktoken = message.LockToken}, cancellationToken: cancellationToken);
- // return reply;
- // await _settlement.CompleteAsync(message.LockToken, cancellationToken);
+ await _settlement.CompleteAsync(new() { Locktoken = message.LockToken}, cancellationToken: cancellationToken);
+ }
+
+ ///
+ public virtual async Task AbandonMessageAsync(
+ ServiceBusReceivedMessage message,
+ IDictionary? properties,
+ CancellationToken cancellationToken = default)
+ {
+ var request = new AbandonRequest()
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = { TransformProperties(properties) }
+ };
+ await _settlement.AbandonAsync(request, cancellationToken: cancellationToken);
+ }
+
+ ///
+ public virtual async Task DeadLetterMessageAsync(
+ ServiceBusReceivedMessage message,
+ Dictionary? propertiesToModify = default,
+ string? deadLetterReason = default,
+ string? deadLetterErrorDescription = default,
+ CancellationToken cancellationToken = default)
+ {
+ var request = new DeadletterRequest()
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = { TransformProperties(propertiesToModify) }
+ };
+ if (deadLetterReason != null)
+ {
+ request.DeadletterReason = deadLetterReason;
+ }
+
+ if (deadLetterErrorDescription != null)
+ {
+ request.DeadletterErrorDescription = deadLetterErrorDescription;
+ }
+ await _settlement.DeadletterAsync(request, cancellationToken: cancellationToken);
+ }
+
+ ///
+ public virtual async Task DeferMessageAsync(
+ ServiceBusReceivedMessage message,
+ IDictionary? propertiesToModify = default,
+ CancellationToken cancellationToken = default)
+ {
+ var request = new DeferRequest()
+ {
+ Locktoken = message.LockToken,
+ PropertiesToModify = { TransformProperties(propertiesToModify) }
+ };
+ await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
+ }
+
+ private static Dictionary TransformProperties(IDictionary? properties)
+ {
+ var converted = new Dictionary();
+ if (properties == null)
+ {
+ return converted;
+ }
+ // support all types listed here - https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet
+ foreach (var kvp in properties)
+ {
+ switch (kvp.Value)
+ {
+ case string stringValue:
+ converted.Add(kvp.Key, new SettlementProperties() { StringValue = stringValue });
+ break;
+ case bool boolValue:
+ converted.Add(kvp.Key, new SettlementProperties() { BoolValue = boolValue });
+ break;
+ case byte byteValue:
+ // proto does not support single byte, so use int
+ converted.Add(kvp.Key, new SettlementProperties() { IntValue = byteValue });
+ break;
+ case sbyte sbyteValue:
+ // proto does not support single byte, so use int
+ converted.Add(kvp.Key, new SettlementProperties() { IntValue = sbyteValue });
+ break;
+ case short shortValue:
+ // proto does not support short, so use int
+ converted.Add(kvp.Key, new SettlementProperties() { IntValue = shortValue });
+ break;
+ case ushort ushortValue:
+ // proto does not support short, so use int
+ converted.Add(kvp.Key, new SettlementProperties() { IntValue = ushortValue });
+ break;
+ case int intValue:
+ converted.Add(kvp.Key, new SettlementProperties() { IntValue = intValue });
+ break;
+ case uint uintValue:
+ converted.Add(kvp.Key, new SettlementProperties() { UintValue = uintValue });
+ break;
+ case long longValue:
+ converted.Add(kvp.Key, new SettlementProperties() { LongValue = longValue });
+ break;
+ case ulong ulongValue:
+ // proto does not support ulong, so use double
+ converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = ulongValue });
+ break;
+ case double doubleValue:
+ converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = doubleValue });
+ break;
+ case decimal decimalValue:
+ // proto does not support decimal, so use double
+ converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = Decimal.ToDouble(decimalValue) });
+ break;
+ case float floatValue:
+ converted.Add(kvp.Key, new SettlementProperties() { FloatValue = floatValue });
+ break;
+ case char charValue:
+ converted.Add(kvp.Key, new SettlementProperties() { StringValue = charValue.ToString() });
+ break;
+ case Guid guidValue:
+ converted.Add(kvp.Key, new SettlementProperties() { StringValue = guidValue.ToString() });
+ break;
+ case DateTimeOffset dateTimeOffsetValue:
+ // proto does not support DateTimeOffset, so use Timestamp from google.protobuf
+ converted.Add(kvp.Key, new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeOffsetValue) });
+ break;
+ case DateTime dateTimeValue:
+ // proto does not support DateTime, so use Timestamp from google.protobuf
+ converted.Add(kvp.Key, new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeValue) });
+ break;
+ case Uri uriValue:
+ // proto does not support Uri, so use string
+ converted.Add(kvp.Key, new SettlementProperties() { StringValue = uriValue.ToString() });
+ break;
+ case TimeSpan timeSpanValue:
+ // proto does not support TimeSpan, so use string
+ converted.Add(kvp.Key, new SettlementProperties() { StringValue = timeSpanValue.ToString() });
+ break;
+ }
+ }
+
+ return converted;
}
}
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
index 504d34f99..cfdf3beb1 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActionsConverter.cs
@@ -4,6 +4,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
+using Microsoft.Azure.ServiceBus.Grpc;
namespace Microsoft.Azure.Functions.Worker
{
diff --git a/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs b/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
deleted file mode 100644
index 4048f15ed..000000000
--- a/extensions/Worker.Extensions.ServiceBus/src/SettlementImpl.cs
+++ /dev/null
@@ -1,28 +0,0 @@
-// Copyright (c) Jacob Viau. All rights reserved.
-// Licensed under the MIT. See LICENSE file in the project root for full license information.
-
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
-
-namespace Microsoft.Azure.Functions.Worker
-{
- internal class SettlementImpl : ISettlement
- {
- private readonly ILogger _logger;
- private readonly Settlement.SettlementClient _client;
-
- public SettlementImpl(Settlement.SettlementClient client, ILogger logger)
- {
- _client = client;
- _logger = logger;
- }
-
- public async Task CompleteAsync(string lockToken, CancellationToken cancellation = default)
- {
- _logger.LogInformation("Completing message with lock token {LockToken}", lockToken);
- CompleteReply reply = await _client.CompleteAsync(new() { Locktoken = lockToken}, cancellationToken: cancellation);
- return reply;
- }
- }
-}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
index 0b79aec72..3074cf35c 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
@@ -4,6 +4,7 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions.Rpc;
+using Microsoft.Azure.ServiceBus.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Samples.Extensions.Rpc.Worker;
@@ -21,8 +22,5 @@ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBui
IOptions options = sp.GetRequiredService>();
return new Settlement.SettlementClient(options.Value.CallInvoker);
});
-
- // applicationBuilder.Services.AddSingleton();
- // applicationBuilder.Services.AddTransient();
}
}
\ No newline at end of file
From ad594f1bb5a0599396091f8ba96f2eff1a4307ee Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 4 Oct 2023 13:36:28 -0700
Subject: [PATCH 05/15] fix
---
NuGet.Config | 1 -
.../src/Properties/AssemblyInfo.cs | 2 --
.../src/Worker.Extensions.ServiceBus.csproj | 4 ++--
3 files changed, 2 insertions(+), 5 deletions(-)
diff --git a/NuGet.Config b/NuGet.Config
index 65ee1715a..238ff8b97 100644
--- a/NuGet.Config
+++ b/NuGet.Config
@@ -4,6 +4,5 @@
-
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
index 441901b51..f1985dbc0 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
@@ -1,8 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
-using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.0-alpha.20231004.1")]
-[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.WorkerExtension.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 9d5faf769..3b325d926 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -16,11 +16,11 @@
-
+
-
+
From 136c4732d5be45812664f8738cfa43506ff4b329 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Fri, 13 Oct 2023 10:28:23 -0700
Subject: [PATCH 06/15] PR fb
---
.../src/Proto/settlement.proto | 7 +-
.../src/ServiceBusMessageActions.cs | 124 +++++++-----------
.../src/Worker.Extensions.ServiceBus.csproj | 1 -
3 files changed, 49 insertions(+), 83 deletions(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
index 62fbc40ba..4cb884485 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
+++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
@@ -2,6 +2,7 @@ syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
+import "google/protobuf/wrappers.proto";
// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
@@ -36,8 +37,8 @@ message AbandonRequest {
message DeadletterRequest {
string locktoken = 1;
map propertiesToModify = 2;
- optional string deadletterReason = 3;
- optional string deadletterErrorDescription = 4;
+ google.protobuf.StringValue deadletterReason= 3;
+ google.protobuf.StringValue deadletterErrorDescription = 4;
}
// The defer message request containing the locktoken and properties to modify.
@@ -48,7 +49,7 @@ message DeferRequest {
// The settlement property can be of any type listed below, which
// corresponds to the types specified in
-// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
+// https://learn.microsoft.com/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
message SettlementProperties {
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
index a477d345d..8e3ebce0b 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -19,12 +19,23 @@ internal ServiceBusMessageActions(Settlement.SettlementClient settlement)
_settlement = settlement;
}
+ ///
+ /// Initializes a new instance of the class for mocking use in testing.
+ ///
+ ///
+ /// This constructor exists only to support mocking. When used, class state is not fully initialized, and
+ /// will not function correctly; virtual members are meant to be mocked.
+ ///
+ protected ServiceBusMessageActions()
+ {
+ }
+
///
public virtual async Task CompleteMessageAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
- await _settlement.CompleteAsync(new() { Locktoken = message.LockToken}, cancellationToken: cancellationToken);
+ await _settlement.CompleteAsync(new() { Locktoken = message.LockToken }, cancellationToken: cancellationToken);
}
///
@@ -52,17 +63,11 @@ public virtual async Task DeadLetterMessageAsync(
var request = new DeadletterRequest()
{
Locktoken = message.LockToken,
- PropertiesToModify = { TransformProperties(propertiesToModify) }
+ PropertiesToModify = { TransformProperties(propertiesToModify) },
+ DeadletterReason = deadLetterReason,
+ DeadletterErrorDescription = deadLetterErrorDescription
};
- if (deadLetterReason != null)
- {
- request.DeadletterReason = deadLetterReason;
- }
- if (deadLetterErrorDescription != null)
- {
- request.DeadletterErrorDescription = deadLetterErrorDescription;
- }
await _settlement.DeadletterAsync(request, cancellationToken: cancellationToken);
}
@@ -90,76 +95,37 @@ private static Dictionary TransformProperties(IDic
// support all types listed here - https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet
foreach (var kvp in properties)
{
- switch (kvp.Value)
+ SettlementProperties settlementProperties = kvp.Value switch
{
- case string stringValue:
- converted.Add(kvp.Key, new SettlementProperties() { StringValue = stringValue });
- break;
- case bool boolValue:
- converted.Add(kvp.Key, new SettlementProperties() { BoolValue = boolValue });
- break;
- case byte byteValue:
- // proto does not support single byte, so use int
- converted.Add(kvp.Key, new SettlementProperties() { IntValue = byteValue });
- break;
- case sbyte sbyteValue:
- // proto does not support single byte, so use int
- converted.Add(kvp.Key, new SettlementProperties() { IntValue = sbyteValue });
- break;
- case short shortValue:
- // proto does not support short, so use int
- converted.Add(kvp.Key, new SettlementProperties() { IntValue = shortValue });
- break;
- case ushort ushortValue:
- // proto does not support short, so use int
- converted.Add(kvp.Key, new SettlementProperties() { IntValue = ushortValue });
- break;
- case int intValue:
- converted.Add(kvp.Key, new SettlementProperties() { IntValue = intValue });
- break;
- case uint uintValue:
- converted.Add(kvp.Key, new SettlementProperties() { UintValue = uintValue });
- break;
- case long longValue:
- converted.Add(kvp.Key, new SettlementProperties() { LongValue = longValue });
- break;
- case ulong ulongValue:
- // proto does not support ulong, so use double
- converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = ulongValue });
- break;
- case double doubleValue:
- converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = doubleValue });
- break;
- case decimal decimalValue:
- // proto does not support decimal, so use double
- converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = Decimal.ToDouble(decimalValue) });
- break;
- case float floatValue:
- converted.Add(kvp.Key, new SettlementProperties() { FloatValue = floatValue });
- break;
- case char charValue:
- converted.Add(kvp.Key, new SettlementProperties() { StringValue = charValue.ToString() });
- break;
- case Guid guidValue:
- converted.Add(kvp.Key, new SettlementProperties() { StringValue = guidValue.ToString() });
- break;
- case DateTimeOffset dateTimeOffsetValue:
- // proto does not support DateTimeOffset, so use Timestamp from google.protobuf
- converted.Add(kvp.Key, new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeOffsetValue) });
- break;
- case DateTime dateTimeValue:
- // proto does not support DateTime, so use Timestamp from google.protobuf
- converted.Add(kvp.Key, new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeValue) });
- break;
- case Uri uriValue:
- // proto does not support Uri, so use string
- converted.Add(kvp.Key, new SettlementProperties() { StringValue = uriValue.ToString() });
- break;
- case TimeSpan timeSpanValue:
- // proto does not support TimeSpan, so use string
- converted.Add(kvp.Key, new SettlementProperties() { StringValue = timeSpanValue.ToString() });
- break;
- }
+ string stringValue => new SettlementProperties() { StringValue = stringValue },
+ bool boolValue => new SettlementProperties() { BoolValue = boolValue },
+ // proto does not support single byte, so use int
+ byte byteValue => new SettlementProperties() { IntValue = byteValue },
+ sbyte sbyteValue => new SettlementProperties() { IntValue = sbyteValue },
+ // proto does not support short, so use int
+ short shortValue => new SettlementProperties() { IntValue = shortValue },
+ ushort ushortValue => new SettlementProperties() { IntValue = ushortValue },
+ int intValue => new SettlementProperties() { IntValue = intValue },
+ uint uintValue => new SettlementProperties() { UintValue = uintValue },
+ long longValue => new SettlementProperties() { LongValue = longValue },
+ // proto does not support ulong, so use double
+ ulong ulongValue => new SettlementProperties() { DoubleValue = ulongValue },
+ double doubleValue => new SettlementProperties() { DoubleValue = doubleValue },
+ decimal decimalValue => new SettlementProperties() { DoubleValue = decimal.ToDouble(decimalValue) },
+ float floatValue => new SettlementProperties() { FloatValue = floatValue },
+ char charValue => new SettlementProperties() { StringValue = charValue.ToString() },
+ Guid guidValue => new SettlementProperties() { StringValue = guidValue.ToString() },
+ DateTimeOffset dateTimeOffsetValue => new SettlementProperties()
+ { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeOffsetValue) },
+ // proto does not support DateTime, so use Timestamp from google.protobuf
+ DateTime dateTimeValue => new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeValue) },
+ // proto does not support Uri, so use string
+ Uri uriValue => new SettlementProperties() { StringValue = uriValue.ToString() },
+ // proto does not support TimeSpan, so use string
+ TimeSpan timeSpanValue => new SettlementProperties() { StringValue = timeSpanValue.ToString() },
+ _ => throw new NotSupportedException($"Unsupported property type {kvp.Value.GetType()}"),
+ };
+ converted.Add(kvp.Key, settlementProperties);
}
return converted;
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 9b0f21c9c..72c9f7034 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -19,7 +19,6 @@
-
From 4b90b99845df7d12a9c45c7a1757545504a2ed83 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Fri, 13 Oct 2023 10:29:17 -0700
Subject: [PATCH 07/15] Update dependency version
---
.../Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
index e799bbf15..099475a69 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
@@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
-[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.0-alpha.20231004.1")]
+[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.0")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
From 548b567b1ab414d3d6057dc9c4f29b6e301b5cb1 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Fri, 13 Oct 2023 10:30:02 -0700
Subject: [PATCH 08/15] privateassets
---
.../src/Worker.Extensions.ServiceBus.csproj | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 72c9f7034..a4dcb7b74 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -18,7 +18,7 @@
-
+
From b833e6ff85a7e551213e617cde0d1d487de54c24 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 17 Oct 2023 14:27:19 -0700
Subject: [PATCH 09/15] Use bytes for propertiesToModify
---
.../src/Properties/AssemblyInfo.cs | 2 +-
.../src/Proto/settlement.proto | 30 +-
.../src/ServiceBusMessageActions.cs | 313 +++++++++++++++---
.../src/Startup.cs | 1 +
.../src/Worker.Extensions.ServiceBus.csproj | 6 +
5 files changed, 283 insertions(+), 69 deletions(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
index 099475a69..4c8cdec6e 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
@@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
-[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.0")]
+[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.1")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
index 4cb884485..fdca72531 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
+++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
@@ -2,7 +2,6 @@ syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
-import "google/protobuf/wrappers.proto";
// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
@@ -30,38 +29,19 @@ message CompleteRequest {
// The abandon message request containing the locktoken and properties to modify.
message AbandonRequest {
string locktoken = 1;
- map propertiesToModify = 2;
+ bytes propertiesToModify = 2;
}
// The deadletter message request containing the locktoken and properties to modify along with the reason/description.
message DeadletterRequest {
string locktoken = 1;
- map propertiesToModify = 2;
- google.protobuf.StringValue deadletterReason= 3;
- google.protobuf.StringValue deadletterErrorDescription = 4;
+ bytes propertiesToModify = 2;
+ string deadletterReason = 3;
+ string deadletterErrorDescription = 4;
}
// The defer message request containing the locktoken and properties to modify.
message DeferRequest {
string locktoken = 1;
- map propertiesToModify = 2;
-}
-
-// The settlement property can be of any type listed below, which
-// corresponds to the types specified in
-// https://learn.microsoft.com/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
-// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
-// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
-message SettlementProperties {
- oneof values {
- string stringValue = 1;
- int32 intValue = 2;
- uint32 uintValue = 3;
- int64 longValue = 4;
- uint64 ulongValue = 5;
- bool boolValue = 6;
- float floatValue = 7;
- double doubleValue = 8;
- google.protobuf.Timestamp timestampValue = 9;
- }
+ bytes propertiesToModify = 2;
}
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
index 8e3ebce0b..b551e9161 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -1,11 +1,18 @@
using System;
+using System.Collections;
using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
-using Google.Protobuf.WellKnownTypes;
+using Google.Protobuf;
+using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;
+using Type = System.Type;
namespace Microsoft.Azure.Functions.Worker
{
@@ -14,6 +21,33 @@ public class ServiceBusMessageActions
{
private readonly Settlement.SettlementClient _settlement;
+ /// The size, in bytes, to use as a buffer for stream operations.
+ private const int StreamBufferSizeInBytes = 512;
+
+ /// The set of mappings from CLR types to AMQP types for property values.
+ private static readonly IReadOnlyDictionary AmqpPropertyTypeMap = new Dictionary
+ {
+ { typeof(byte), AmqpType.Byte },
+ { typeof(sbyte), AmqpType.SByte },
+ { typeof(char), AmqpType.Char },
+ { typeof(short), AmqpType.Int16 },
+ { typeof(ushort), AmqpType.UInt16 },
+ { typeof(int), AmqpType.Int32 },
+ { typeof(uint), AmqpType.UInt32 },
+ { typeof(long), AmqpType.Int64 },
+ { typeof(ulong), AmqpType.UInt64 },
+ { typeof(float), AmqpType.Single },
+ { typeof(double), AmqpType.Double },
+ { typeof(decimal), AmqpType.Decimal },
+ { typeof(bool), AmqpType.Boolean },
+ { typeof(Guid), AmqpType.Guid },
+ { typeof(string), AmqpType.String },
+ { typeof(Uri), AmqpType.Uri },
+ { typeof(DateTime), AmqpType.DateTime },
+ { typeof(DateTimeOffset), AmqpType.DateTimeOffset },
+ { typeof(TimeSpan), AmqpType.TimeSpan },
+ };
+
internal ServiceBusMessageActions(Settlement.SettlementClient settlement)
{
_settlement = settlement;
@@ -41,14 +75,17 @@ public virtual async Task CompleteMessageAsync(
///
public virtual async Task AbandonMessageAsync(
ServiceBusReceivedMessage message,
- IDictionary? properties,
+ IDictionary? propertiesToModify,
CancellationToken cancellationToken = default)
{
var request = new AbandonRequest()
{
Locktoken = message.LockToken,
- PropertiesToModify = { TransformProperties(properties) }
};
+ if (propertiesToModify != null)
+ {
+ request.PropertiesToModify = ConvertToByteString(propertiesToModify);
+ }
await _settlement.AbandonAsync(request, cancellationToken: cancellationToken);
}
@@ -63,11 +100,13 @@ public virtual async Task DeadLetterMessageAsync(
var request = new DeadletterRequest()
{
Locktoken = message.LockToken,
- PropertiesToModify = { TransformProperties(propertiesToModify) },
DeadletterReason = deadLetterReason,
DeadletterErrorDescription = deadLetterErrorDescription
};
-
+ if (propertiesToModify != null)
+ {
+ request.PropertiesToModify = ConvertToByteString(propertiesToModify);
+ }
await _settlement.DeadletterAsync(request, cancellationToken: cancellationToken);
}
@@ -80,55 +119,243 @@ public virtual async Task DeferMessageAsync(
var request = new DeferRequest()
{
Locktoken = message.LockToken,
- PropertiesToModify = { TransformProperties(propertiesToModify) }
};
+ if (propertiesToModify != null)
+ {
+ request.PropertiesToModify = ConvertToByteString(propertiesToModify);
+ }
await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
}
- private static Dictionary TransformProperties(IDictionary? properties)
+ private static ByteString ConvertToByteString(IDictionary propertiesToModify)
+ {
+ var map = new AmqpMap();
+ foreach (KeyValuePair kvp in propertiesToModify)
+ {
+ if (TryCreateAmqpPropertyValueFromNetProperty(kvp.Value, out var amqpValue))
+ {
+ map[new MapKey(kvp.Value)] = amqpValue;
+ }
+ else
+ {
+ throw new NotSupportedException(
+ string.Format(
+ CultureInfo.CurrentCulture,
+ "The key `{0}` has a value of type `{1}` which is not supported for AMQP transport." +
+ "The list of supported types can be found here: https://learn.microsoft.com/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks",
+ kvp.Key,
+ kvp.Value?.GetType().Name));
+ }
+ }
+
+ using ByteBuffer buffer = new ByteBuffer(256, true);
+ AmqpCodec.EncodeMap(map, buffer);
+ return ByteString.CopyFrom(buffer.Buffer, 0, buffer.Length);
+ }
+
+ ///
+ /// Attempts to create an AMQP property value for a given event property.
+ ///
+ ///
+ /// The value of the event property to create an AMQP property value for.
+ /// The AMQP property value that was created.
+ /// true to allow an AMQP map to be translated to additional types supported only by a message body; otherwise, false.
+ ///
+ /// true if an AMQP property value was able to be created; otherwise, false.
+ ///
+ private static bool TryCreateAmqpPropertyValueFromNetProperty(
+ object? propertyValue,
+ out object? amqpPropertyValue,
+ bool allowBodyTypes = false)
{
- var converted = new Dictionary();
- if (properties == null)
+ amqpPropertyValue = null;
+
+ if (propertyValue == null)
+ {
+ return true;
+ }
+
+ switch (GetTypeIdentifier(propertyValue))
{
- return converted;
+ case AmqpType.Byte:
+ case AmqpType.SByte:
+ case AmqpType.Int16:
+ case AmqpType.Int32:
+ case AmqpType.Int64:
+ case AmqpType.UInt16:
+ case AmqpType.UInt32:
+ case AmqpType.UInt64:
+ case AmqpType.Single:
+ case AmqpType.Double:
+ case AmqpType.Boolean:
+ case AmqpType.Decimal:
+ case AmqpType.Char:
+ case AmqpType.Guid:
+ case AmqpType.DateTime:
+ case AmqpType.String:
+ amqpPropertyValue = propertyValue;
+ break;
+
+ case AmqpType.Stream:
+ case AmqpType.Unknown when propertyValue is Stream:
+ amqpPropertyValue = ReadStreamToArraySegment((Stream)propertyValue);
+ break;
+
+ case AmqpType.Uri:
+ amqpPropertyValue = new DescribedType((AmqpSymbol)AmqpMessageConstants.Uri, ((Uri)propertyValue).AbsoluteUri);
+ break;
+
+ case AmqpType.DateTimeOffset:
+ amqpPropertyValue = new DescribedType((AmqpSymbol)AmqpMessageConstants.DateTimeOffset, ((DateTimeOffset)propertyValue).UtcTicks);
+ break;
+
+ case AmqpType.TimeSpan:
+ amqpPropertyValue = new DescribedType((AmqpSymbol)AmqpMessageConstants.TimeSpan, ((TimeSpan)propertyValue).Ticks);
+ break;
+
+ case AmqpType.Unknown when allowBodyTypes && propertyValue is byte[] byteArray:
+ amqpPropertyValue = new ArraySegment(byteArray);
+ break;
+
+ case AmqpType.Unknown when allowBodyTypes && propertyValue is IDictionary dict:
+ amqpPropertyValue = new AmqpMap(dict);
+ break;
+
+ case AmqpType.Unknown when allowBodyTypes && propertyValue is IList:
+ amqpPropertyValue = propertyValue;
+ break;
+
+ case AmqpType.Unknown:
+ var exception = new SerializationException(string.Format(CultureInfo.CurrentCulture, "Serialization failed due to an unsupported type, {0}.", propertyValue.GetType().FullName));
+ throw exception;
}
- // support all types listed here - https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet
- foreach (var kvp in properties)
+
+ return (amqpPropertyValue != null);
+ }
+
+ ///
+ /// Converts a stream to an representation.
+ ///
+ ///
+ /// The stream to read and capture in memory.
+ ///
+ /// The containing the stream data.
+ ///
+ private static ArraySegment ReadStreamToArraySegment(Stream stream)
+ {
+ switch (stream)
{
- SettlementProperties settlementProperties = kvp.Value switch
+ case { Length: < 1 }:
+ return default;
+
+ case BufferListStream bufferListStream:
+ return bufferListStream.ReadBytes((int)stream.Length);
+
+ case MemoryStream memStreamSource:
{
- string stringValue => new SettlementProperties() { StringValue = stringValue },
- bool boolValue => new SettlementProperties() { BoolValue = boolValue },
- // proto does not support single byte, so use int
- byte byteValue => new SettlementProperties() { IntValue = byteValue },
- sbyte sbyteValue => new SettlementProperties() { IntValue = sbyteValue },
- // proto does not support short, so use int
- short shortValue => new SettlementProperties() { IntValue = shortValue },
- ushort ushortValue => new SettlementProperties() { IntValue = ushortValue },
- int intValue => new SettlementProperties() { IntValue = intValue },
- uint uintValue => new SettlementProperties() { UintValue = uintValue },
- long longValue => new SettlementProperties() { LongValue = longValue },
- // proto does not support ulong, so use double
- ulong ulongValue => new SettlementProperties() { DoubleValue = ulongValue },
- double doubleValue => new SettlementProperties() { DoubleValue = doubleValue },
- decimal decimalValue => new SettlementProperties() { DoubleValue = decimal.ToDouble(decimalValue) },
- float floatValue => new SettlementProperties() { FloatValue = floatValue },
- char charValue => new SettlementProperties() { StringValue = charValue.ToString() },
- Guid guidValue => new SettlementProperties() { StringValue = guidValue.ToString() },
- DateTimeOffset dateTimeOffsetValue => new SettlementProperties()
- { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeOffsetValue) },
- // proto does not support DateTime, so use Timestamp from google.protobuf
- DateTime dateTimeValue => new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeValue) },
- // proto does not support Uri, so use string
- Uri uriValue => new SettlementProperties() { StringValue = uriValue.ToString() },
- // proto does not support TimeSpan, so use string
- TimeSpan timeSpanValue => new SettlementProperties() { StringValue = timeSpanValue.ToString() },
- _ => throw new NotSupportedException($"Unsupported property type {kvp.Value.GetType()}"),
- };
- converted.Add(kvp.Key, settlementProperties);
+ using var memStreamCopy = new MemoryStream((int)(memStreamSource.Length - memStreamSource.Position));
+ memStreamSource.CopyTo(memStreamCopy, StreamBufferSizeInBytes);
+ if (!memStreamCopy.TryGetBuffer(out ArraySegment segment))
+ {
+ segment = new ArraySegment(memStreamCopy.ToArray());
+ }
+ return segment;
+ }
+
+ default:
+ {
+ using var memStreamCopy = new MemoryStream(StreamBufferSizeInBytes);
+ stream.CopyTo(memStreamCopy, StreamBufferSizeInBytes);
+ if (!memStreamCopy.TryGetBuffer(out ArraySegment segment))
+ {
+ segment = new ArraySegment(memStreamCopy.ToArray());
+ }
+ return segment;
+ }
+ }
+ }
+
+ ///
+ /// Represents the supported AMQP property types.
+ ///
+ ///
+ ///
+ /// WARNING:
+ /// These values are synchronized between Azure services and the client
+ /// library. You must consult with the Event Hubs/Service Bus service team before making
+ /// changes, including adding a new member.
+ ///
+ /// When adding a new member, remember to always do so before the Unknown
+ /// member.
+ ///
+ ///
+ private enum AmqpType
+ {
+ Null,
+ Byte,
+ SByte,
+ Char,
+ Int16,
+ UInt16,
+ Int32,
+ UInt32,
+ Int64,
+ UInt64,
+ Single,
+ Double,
+ Decimal,
+ Boolean,
+ Guid,
+ String,
+ Uri,
+ DateTime,
+ DateTimeOffset,
+ TimeSpan,
+ Stream,
+ Unknown
+ }
+
+ ///
+ /// Gets the AMQP property type identifier for a given
+ /// value.
+ ///
+ ///
+ /// The value to determine the type identifier for.
+ ///
+ /// The that was identified for the given .
+ ///
+ private static AmqpType GetTypeIdentifier(object? value) => ToAmqpPropertyType(value?.GetType());
+
+ ///
+ /// Translates the given to the corresponding
+ /// .
+ ///
+ ///
+ /// The type to convert to an AMQP type.
+ ///
+ /// The AMQP property type that best matches the specified .
+ ///
+ private static AmqpType ToAmqpPropertyType(Type? type)
+ {
+ if (type == null)
+ {
+ return AmqpType.Null;
+ }
+
+ if (AmqpPropertyTypeMap.TryGetValue(type, out AmqpType amqpType))
+ {
+ return amqpType;
}
- return converted;
+ return AmqpType.Unknown;
+ }
+
+ internal static class AmqpMessageConstants
+ {
+ public const string Vendor = "com.microsoft";
+ public const string TimeSpan = Vendor + ":timespan";
+ public const string Uri = Vendor + ":uri";
+ public const string DateTimeOffset = Vendor + ":datetime-offset";
}
}
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
index 3074cf35c..ef7a176f6 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
@@ -22,5 +22,6 @@ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBui
IOptions options = sp.GetRequiredService>();
return new Settlement.SettlementClient(options.Value.CallInvoker);
});
+ applicationBuilder.Services.AddWorkerRpc();
}
}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index a4dcb7b74..31e57052c 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -24,6 +24,7 @@
+
@@ -33,5 +34,10 @@
+
+
+
+
+
From 9364d26c0df5db023493e39ce64f110fc8f283b0 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Tue, 17 Oct 2023 14:28:34 -0700
Subject: [PATCH 10/15] Bump SB dependency version
---
.../src/Worker.Extensions.ServiceBus.csproj | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 31e57052c..3b4409a9f 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -15,7 +15,7 @@
-
+
From 7c4a08a45ae9afd25695dc514bcba09035b3e8e5 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 18 Oct 2023 10:43:58 -0700
Subject: [PATCH 11/15] Fix csproj
---
.../src/Worker.Extensions.ServiceBus.csproj | 6 ------
1 file changed, 6 deletions(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 3b4409a9f..4a3ebd8ba 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -34,10 +34,4 @@
-
-
-
-
-
-
From 8f57a2be85b8fdb1aef275e7a2f6ec7ff9943696 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 18 Oct 2023 17:09:37 -0700
Subject: [PATCH 12/15] Add tests and samples
---
.../src/Properties/AssemblyInfo.cs | 2 +-
.../src/Proto/settlement.proto | 8 +-
.../src/ServiceBusMessageActions.cs | 4 +-
samples/Extensions/Extensions.csproj | 7 +-
.../ServiceBusReceivedMessageFunctions.cs | 16 +++
.../ServiceBusMessageActionsTests.cs | 123 ++++++++++++++++++
6 files changed, 152 insertions(+), 8 deletions(-)
create mode 100644 test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
index 4c8cdec6e..4889774e6 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/Properties/AssemblyInfo.cs
@@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
-[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.1")]
+[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.2")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
index fdca72531..969e94357 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
+++ b/extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
@@ -1,7 +1,7 @@
syntax = "proto3";
-import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
+import "google/protobuf/wrappers.proto";
// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
@@ -36,12 +36,12 @@ message AbandonRequest {
message DeadletterRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
- string deadletterReason = 3;
- string deadletterErrorDescription = 4;
+ google.protobuf.StringValue deadletterReason = 3;
+ google.protobuf.StringValue deadletterErrorDescription = 4;
}
// The defer message request containing the locktoken and properties to modify.
message DeferRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
-}
+}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
index b551e9161..9af1dab5e 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -127,14 +127,14 @@ public virtual async Task DeferMessageAsync(
await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
}
- private static ByteString ConvertToByteString(IDictionary propertiesToModify)
+ internal static ByteString ConvertToByteString(IDictionary propertiesToModify)
{
var map = new AmqpMap();
foreach (KeyValuePair kvp in propertiesToModify)
{
if (TryCreateAmqpPropertyValueFromNetProperty(kvp.Value, out var amqpValue))
{
- map[new MapKey(kvp.Value)] = amqpValue;
+ map[new MapKey(kvp.Key)] = amqpValue;
}
else
{
diff --git a/samples/Extensions/Extensions.csproj b/samples/Extensions/Extensions.csproj
index 0d66a0883..f37fda33a 100644
--- a/samples/Extensions/Extensions.csproj
+++ b/samples/Extensions/Extensions.csproj
@@ -13,7 +13,7 @@
-
+
@@ -32,4 +32,9 @@
Never
+
+
+
+
+
\ No newline at end of file
diff --git a/samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs b/samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs
index 838ee63b3..ab4473e2b 100644
--- a/samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs
+++ b/samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs
@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
+using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
@@ -74,5 +75,20 @@ public void ServiceBusReceivedMessageWithStringProperties(
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
+ //
+ [Function(nameof(ServiceBusReceivedMessageFunction))]
+ public async Task ServiceBusMessageActionsFunction(
+ [ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
+ ServiceBusReceivedMessage message,
+ ServiceBusMessageActions messageActions)
+ {
+ _logger.LogInformation("Message ID: {id}", message.MessageId);
+ _logger.LogInformation("Message Body: {body}", message.Body);
+ _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
+
+ // Complete the message
+ await messageActions.CompleteMessageAsync(message);
+ }
+ //
}
}
diff --git a/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs
new file mode 100644
index 000000000..b3c147a85
--- /dev/null
+++ b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs
@@ -0,0 +1,123 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Microsoft.Azure.ServiceBus.Grpc;
+using Xunit;
+
+namespace Microsoft.Azure.Functions.Worker.Extensions.Tests
+{
+ public class ServiceBusMessageActionsTests
+ {
+ [Fact]
+ public async Task CanCompleteMessage()
+ {
+ var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid());
+ var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken));
+ await messageActions.CompleteMessageAsync(message);
+ }
+
+ [Fact]
+ public async Task CanAbandonMessage()
+ {
+ var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid());
+ var properties = new Dictionary()
+ {
+ { "int", 1 },
+ { "string", "foo"},
+ { "timespan", TimeSpan.FromSeconds(1) },
+ { "datetime", DateTime.UtcNow },
+ { "datetimeoffset", DateTimeOffset.UtcNow },
+ { "guid", Guid.NewGuid() }
+ };
+ var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken, properties));
+ await messageActions.AbandonMessageAsync(message, properties);
+ }
+
+ [Fact]
+ public async Task CanDeadLetterMessage()
+ {
+ var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid());
+ var properties = new Dictionary()
+ {
+ { "int", 1 },
+ { "string", "foo"},
+ { "timespan", TimeSpan.FromSeconds(1) },
+ { "datetime", DateTime.UtcNow },
+ { "datetimeoffset", DateTimeOffset.UtcNow },
+ { "guid", Guid.NewGuid() }
+ };
+ var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken, properties));
+ await messageActions.DeadLetterMessageAsync(message, properties);
+ }
+
+ [Fact]
+ public async Task CanDeferMessage()
+ {
+ var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid());
+ var properties = new Dictionary()
+ {
+ { "int", 1 },
+ { "string", "foo"},
+ { "timespan", TimeSpan.FromSeconds(1) },
+ { "datetime", DateTime.UtcNow },
+ { "datetimeoffset", DateTimeOffset.UtcNow },
+ { "guid", Guid.NewGuid() }
+ };
+ var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken, properties));
+ await messageActions.DeferMessageAsync(message, properties);
+ }
+
+ private class MockSettlementClient : Settlement.SettlementClient
+ {
+ private readonly string _lockToken;
+ private readonly ByteString _propertiesToModify;
+ public MockSettlementClient(string lockToken, IDictionary propertiesToModify = default) : base()
+ {
+ _lockToken = lockToken;
+ if (propertiesToModify != null)
+ {
+ _propertiesToModify = ServiceBusMessageActions.ConvertToByteString(propertiesToModify);
+ }
+ }
+
+ public override AsyncUnaryCall CompleteAsync(CompleteRequest request, Metadata headers = null, DateTime? deadline = null,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Assert.Equal(_lockToken, request.Locktoken);
+ return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
+ }
+
+ public override AsyncUnaryCall AbandonAsync(AbandonRequest request, Metadata headers = null, DateTime? deadline = null,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Assert.Equal(_lockToken, request.Locktoken);
+ Assert.Equal(_propertiesToModify, request.PropertiesToModify);
+ return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
+ }
+
+ public override AsyncUnaryCall DeadletterAsync(DeadletterRequest request, Metadata headers = null, DateTime? deadline = null,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Assert.Equal(_lockToken, request.Locktoken);
+ Assert.Equal(_propertiesToModify, request.PropertiesToModify);
+ return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
+ }
+
+ public override AsyncUnaryCall DeferAsync(DeferRequest request, Metadata headers = null, DateTime? deadline = null,
+ CancellationToken cancellationToken = default(CancellationToken))
+ {
+ Assert.Equal(_lockToken, request.Locktoken);
+ Assert.Equal(_propertiesToModify, request.PropertiesToModify);
+ return new AsyncUnaryCall(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
+ }
+ }
+ }
+}
\ No newline at end of file
From f2456917f9fad4985a4583b4d3f9e2db2da16a76 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 18 Oct 2023 17:15:06 -0700
Subject: [PATCH 13/15] Null validation
---
.../src/ServiceBusMessageActions.cs | 22 ++++++++++++++++++-
.../ServiceBusMessageActionsTests.cs | 10 +++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
index 9af1dab5e..d02de18db 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusMessageActions.cs
@@ -69,15 +69,25 @@ public virtual async Task CompleteMessageAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
+ if (message == null)
+ {
+ throw new ArgumentNullException(nameof(message));
+ }
+
await _settlement.CompleteAsync(new() { Locktoken = message.LockToken }, cancellationToken: cancellationToken);
}
///
public virtual async Task AbandonMessageAsync(
ServiceBusReceivedMessage message,
- IDictionary? propertiesToModify,
+ IDictionary? propertiesToModify = default,
CancellationToken cancellationToken = default)
{
+ if (message == null)
+ {
+ throw new ArgumentNullException(nameof(message));
+ }
+
var request = new AbandonRequest()
{
Locktoken = message.LockToken,
@@ -97,6 +107,11 @@ public virtual async Task DeadLetterMessageAsync(
string? deadLetterErrorDescription = default,
CancellationToken cancellationToken = default)
{
+ if (message == null)
+ {
+ throw new ArgumentNullException(nameof(message));
+ }
+
var request = new DeadletterRequest()
{
Locktoken = message.LockToken,
@@ -116,6 +131,11 @@ public virtual async Task DeferMessageAsync(
IDictionary? propertiesToModify = default,
CancellationToken cancellationToken = default)
{
+ if (message == null)
+ {
+ throw new ArgumentNullException(nameof(message));
+ }
+
var request = new DeferRequest()
{
Locktoken = message.LockToken,
diff --git a/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs
index b3c147a85..04096f40a 100644
--- a/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs
+++ b/test/Worker.Extensions.Tests/ServiceBus/ServiceBusMessageActionsTests.cs
@@ -75,6 +75,16 @@ public async Task CanDeferMessage()
await messageActions.DeferMessageAsync(message, properties);
}
+ [Fact]
+ public async Task PassingNullMessageThrows()
+ {
+ var messageActions = new ServiceBusMessageActions(new MockSettlementClient(null));
+ await Assert.ThrowsAsync(async () => await messageActions.CompleteMessageAsync(null));
+ await Assert.ThrowsAsync(async () => await messageActions.AbandonMessageAsync(null));
+ await Assert.ThrowsAsync(async () => await messageActions.DeadLetterMessageAsync(null));
+ await Assert.ThrowsAsync(async () => await messageActions.DeferMessageAsync(null));
+ }
+
private class MockSettlementClient : Settlement.SettlementClient
{
private readonly string _lockToken;
From 4725a51619bcbd0d4bcd88b4ee2e912d4a81c9b4 Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 18 Oct 2023 17:43:16 -0700
Subject: [PATCH 14/15] Rename startup class and fix namespace
---
.../src/ServiceBusExtensionStartup.cs | 27 +++++++++++++++++++
.../src/Startup.cs | 27 -------------------
2 files changed, 27 insertions(+), 27 deletions(-)
create mode 100644 extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs
delete mode 100644 extensions/Worker.Extensions.ServiceBus/src/Startup.cs
diff --git a/extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs
new file mode 100644
index 000000000..5be4b027b
--- /dev/null
+++ b/extensions/Worker.Extensions.ServiceBus/src/ServiceBusExtensionStartup.cs
@@ -0,0 +1,27 @@
+// Copyright (c) Jacob Viau. All rights reserved.
+// Licensed under the MIT. See LICENSE file in the project root for full license information.
+
+using Microsoft.Azure.Functions.Worker;
+using Microsoft.Azure.Functions.Worker.Core;
+using Microsoft.Azure.Functions.Worker.Extensions.Rpc;
+using Microsoft.Azure.ServiceBus.Grpc;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+
+[assembly: WorkerExtensionStartup(typeof(ServiceBusExtensionStartup))]
+
+namespace Microsoft.Azure.Functions.Worker
+{
+ public sealed class ServiceBusExtensionStartup : WorkerExtensionStartup
+ {
+ public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder)
+ {
+ applicationBuilder.Services.AddTransient(sp =>
+ {
+ IOptions options = sp.GetRequiredService>();
+ return new Settlement.SettlementClient(options.Value.CallInvoker);
+ });
+ applicationBuilder.Services.AddWorkerRpc();
+ }
+ }
+}
\ No newline at end of file
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs b/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
deleted file mode 100644
index ef7a176f6..000000000
--- a/extensions/Worker.Extensions.ServiceBus/src/Startup.cs
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (c) Jacob Viau. All rights reserved.
-// Licensed under the MIT. See LICENSE file in the project root for full license information.
-
-using Microsoft.Azure.Functions.Worker;
-using Microsoft.Azure.Functions.Worker.Core;
-using Microsoft.Azure.Functions.Worker.Extensions.Rpc;
-using Microsoft.Azure.ServiceBus.Grpc;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Options;
-using Samples.Extensions.Rpc.Worker;
-
-[assembly: WorkerExtensionStartup(typeof(Startup))]
-
-namespace Samples.Extensions.Rpc.Worker;
-
-public sealed class Startup : WorkerExtensionStartup
-{
- public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder)
- {
- applicationBuilder.Services.AddTransient(sp =>
- {
- IOptions options = sp.GetRequiredService>();
- return new Settlement.SettlementClient(options.Value.CallInvoker);
- });
- applicationBuilder.Services.AddWorkerRpc();
- }
-}
\ No newline at end of file
From 014530f3e550d5c92b28e5415c7c9c6992dd55dc Mon Sep 17 00:00:00 2001
From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
Date: Wed, 18 Oct 2023 18:09:56 -0700
Subject: [PATCH 15/15] Add back core dependency
---
.../src/Worker.Extensions.ServiceBus.csproj | 1 +
1 file changed, 1 insertion(+)
diff --git a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
index 4a3ebd8ba..02395860a 100644
--- a/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
+++ b/extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj
@@ -24,6 +24,7 @@
+