Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service Bus settlement support #1945

Merged
merged 18 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.12.0")]
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.13.0-alpha.20231004.1")]
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
66 changes: 66 additions & 0 deletions extensions/Worker.Extensions.ServiceBus/src/Proto/settlement.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";

// The settlement service definition.
service Settlement {
// Completes a message
rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {}

// Abandons a message
rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {}

// Deadletters a message
rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {}

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}
}

// The complete message request containing the locktoken.
message CompleteRequest {
string locktoken = 1;
}

// The abandon message request containing the locktoken and properties to modify.
message AbandonRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
}

// The deadletter message request containing the locktoken and properties to modify along with the reason/description.
message DeadletterRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
optional string deadletterReason = 3;
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
optional string deadletterErrorDescription = 4;
}

// The defer message request containing the locktoken and properties to modify.
message DeferRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
}

// The settlement property can be of any type listed below, which
// corresponds to the types specified in
// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
message SettlementProperties {
oneof values {
string stringValue = 1;
int32 intValue = 2;
uint32 uintValue = 3;
int64 longValue = 4;
uint64 ulongValue = 5;
bool boolValue = 6;
float floatValue = 7;
double doubleValue = 8;
google.protobuf.Timestamp timestampValue = 9;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.Functions.Worker
{
[InputConverter(typeof(ServiceBusMessageActionsConverter))]
public class ServiceBusMessageActions
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Settlement.SettlementClient _settlement;

internal ServiceBusMessageActions(Settlement.SettlementClient settlement)
{
_settlement = settlement;
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task CompleteMessageAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
await _settlement.CompleteAsync(new() { Locktoken = message.LockToken}, cancellationToken: cancellationToken);
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task AbandonMessageAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object>? properties,
CancellationToken cancellationToken = default)
{
var request = new AbandonRequest()
{
Locktoken = message.LockToken,
PropertiesToModify = { TransformProperties(properties) }
};
await _settlement.AbandonAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, string, string, CancellationToken)"/>
public virtual async Task DeadLetterMessageAsync(
ServiceBusReceivedMessage message,
Dictionary<string, object>? propertiesToModify = default,
string? deadLetterReason = default,
string? deadLetterErrorDescription = default,
CancellationToken cancellationToken = default)
{
var request = new DeadletterRequest()
{
Locktoken = message.LockToken,
PropertiesToModify = { TransformProperties(propertiesToModify) }
};
if (deadLetterReason != null)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
request.DeadletterReason = deadLetterReason;
}

if (deadLetterErrorDescription != null)
{
request.DeadletterErrorDescription = deadLetterErrorDescription;
}
await _settlement.DeadletterAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
public virtual async Task DeferMessageAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object>? propertiesToModify = default,
CancellationToken cancellationToken = default)
{
var request = new DeferRequest()
{
Locktoken = message.LockToken,
PropertiesToModify = { TransformProperties(propertiesToModify) }
};
await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
}

private static Dictionary<string, SettlementProperties> TransformProperties(IDictionary<string, object>? properties)
{
var converted = new Dictionary<string, SettlementProperties>();
if (properties == null)
{
return converted;
}
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
// support all types listed here - https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet
foreach (var kvp in properties)
{
switch (kvp.Value)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
case string stringValue:
converted.Add(kvp.Key, new SettlementProperties() { StringValue = stringValue });
break;
case bool boolValue:
converted.Add(kvp.Key, new SettlementProperties() { BoolValue = boolValue });
break;
case byte byteValue:
// proto does not support single byte, so use int
converted.Add(kvp.Key, new SettlementProperties() { IntValue = byteValue });
break;
case sbyte sbyteValue:
// proto does not support single byte, so use int
converted.Add(kvp.Key, new SettlementProperties() { IntValue = sbyteValue });
break;
case short shortValue:
// proto does not support short, so use int
converted.Add(kvp.Key, new SettlementProperties() { IntValue = shortValue });
break;
case ushort ushortValue:
// proto does not support short, so use int
converted.Add(kvp.Key, new SettlementProperties() { IntValue = ushortValue });
break;
case int intValue:
converted.Add(kvp.Key, new SettlementProperties() { IntValue = intValue });
break;
case uint uintValue:
converted.Add(kvp.Key, new SettlementProperties() { UintValue = uintValue });
break;
case long longValue:
converted.Add(kvp.Key, new SettlementProperties() { LongValue = longValue });
break;
case ulong ulongValue:
// proto does not support ulong, so use double
converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = ulongValue });
break;
case double doubleValue:
converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = doubleValue });
break;
case decimal decimalValue:
// proto does not support decimal, so use double
converted.Add(kvp.Key, new SettlementProperties() { DoubleValue = Decimal.ToDouble(decimalValue) });
break;
case float floatValue:
converted.Add(kvp.Key, new SettlementProperties() { FloatValue = floatValue });
break;
case char charValue:
converted.Add(kvp.Key, new SettlementProperties() { StringValue = charValue.ToString() });
break;
case Guid guidValue:
converted.Add(kvp.Key, new SettlementProperties() { StringValue = guidValue.ToString() });
break;
case DateTimeOffset dateTimeOffsetValue:
// proto does not support DateTimeOffset, so use Timestamp from google.protobuf
converted.Add(kvp.Key, new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeOffsetValue) });
break;
case DateTime dateTimeValue:
// proto does not support DateTime, so use Timestamp from google.protobuf
converted.Add(kvp.Key, new SettlementProperties() { TimestampValue = Timestamp.FromDateTimeOffset(dateTimeValue) });
break;
case Uri uriValue:
// proto does not support Uri, so use string
converted.Add(kvp.Key, new SettlementProperties() { StringValue = uriValue.ToString() });
break;
case TimeSpan timeSpanValue:
// proto does not support TimeSpan, so use string
converted.Add(kvp.Key, new SettlementProperties() { StringValue = timeSpanValue.ToString() });
break;
}
}

return converted;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusMessageActions" /> type parameter.
/// </summary>
internal class ServiceBusMessageActionsConverter : IInputConverter
liliankasem marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Settlement.SettlementClient _settlement;

public ServiceBusMessageActionsConverter(Settlement.SettlementClient settlement)
{
_settlement = settlement;
}

public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
try
{
return new ValueTask<ConversionResult>(ConversionResult.Success(new ServiceBusMessageActions(_settlement)));
}
catch (Exception exception)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
return new ValueTask<ConversionResult>(ConversionResult.Failed(exception));
}
}
}
}
26 changes: 26 additions & 0 deletions extensions/Worker.Extensions.ServiceBus/src/Startup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Jacob Viau. All rights reserved.
// Licensed under the MIT. See LICENSE file in the project root for full license information.

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions.Rpc;
using Microsoft.Azure.ServiceBus.Grpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Samples.Extensions.Rpc.Worker;

[assembly: WorkerExtensionStartup(typeof(Startup))]

namespace Samples.Extensions.Rpc.Worker;

public sealed class Startup : WorkerExtensionStartup
{
public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder)
{
applicationBuilder.Services.AddTransient(sp =>
{
IOptions<FunctionsGrpcOptions> options = sp.GetRequiredService<IOptions<FunctionsGrpcOptions>>();
return new Settlement.SettlementClient(options.Value.CallInvoker);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
<Import Project="..\..\..\build\Extensions.props" />

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.14.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.16.1" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.6.3" />
<PackageReference Include="Google.Protobuf" Version="3.23.4" />
<PackageReference Include="Grpc.Tools" Version="2.56.2" />
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Rpc" Version="1.0.0" />
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\DotNetWorker.Core\DotNetWorker.Core.csproj" />
<ProjectReference Include="..\..\Worker.Extensions.Rpc\src\Worker.Extensions.Rpc.csproj" />
</ItemGroup>

<ItemGroup>
<SharedReference Include="..\..\Worker.Extensions.Shared\Worker.Extensions.Shared.csproj" />
</ItemGroup>

<ItemGroup>
<Protobuf Include="Proto/settlement.proto" GrpcServices="client" Access="internal" />
</ItemGroup>

</Project>