Skip to content

Commit

Permalink
Add support for settlement from the isolated worker extension (#38865)
Browse files Browse the repository at this point in the history
* Add support for settlement from the isolated worker extension

* Update comment

* roll back version of Grpc.Tools

* roll back further

* PR fb

* Revert eventArgs fields to private

* remove in finally block

* Fix test and move versions to package.data.props

* Update sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs

Co-authored-by: Jesse Squire <[email protected]>

* Add batch test cases

* Fix tests

* Fix

---------

Co-authored-by: Jesse Squire <[email protected]>
  • Loading branch information
JoshLove-msft and jsquire authored Sep 28, 2023
1 parent f8ac921 commit 239c373
Show file tree
Hide file tree
Showing 15 changed files with 1,031 additions and 54 deletions.
5 changes: 4 additions & 1 deletion eng/Packages.Data.props
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,17 @@
<PackageReference Update="Apache.Avro" Version="1.11.0" />
<PackageReference Update="CloudNative.CloudEvents" Version="2.0.0" />
<PackageReference Update="CloudNative.CloudEvents.SystemTextJson" Version="2.0.0" />
<PackageReference Update="Google.Protobuf" Version="3.24.3" />
<PackageReference Update="Grpc.Tools" Version="2.51.0" PrivateAssets="all" />
<PackageReference Update="MessagePack" Version="1.9.11" />
<PackageReference Update="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="1.1.5" />
<PackageReference Update="Microsoft.Azure.SignalR" Version="1.21.6" />
<PackageReference Update="Microsoft.Azure.SignalR.Management" Version="1.21.6" />
<PackageReference Update="Microsoft.Azure.SignalR.Protocols" Version="1.21.6" />
<PackageReference Update="Microsoft.Azure.SignalR.Serverless.Protocols" Version="1.9.0" />
<PackageReference Update="Microsoft.Azure.WebJobs" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.37" PrivateAssets="All"/>
<PackageReference Update="Microsoft.Azure.WebJobs.Extensions.Rpc" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Host.Storage" Version="5.0.0" />
<PackageReference Update="Microsoft.Spatial" Version="7.5.3" />
<PackageReference Update="Newtonsoft.Json" Version="10.0.3" />
Expand Down
10 changes: 5 additions & 5 deletions sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ public DiagnosticScope CreateScope(
/// <param name="traceparent">The trace parent of the message.</param>
/// <param name="tracestate">The trace state of the message.</param>
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object> properties, out string? traceparent, out string? tracestate)
public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object?> properties, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;

if (ActivityExtensions.SupportsActivitySource && properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
{
traceparent = traceParentString;
if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
if (properties.TryGetValue(TraceState, out object? state) && state is string stateString)
{
tracestate = stateString;
}
Expand All @@ -126,15 +126,15 @@ public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object> pr
/// <param name="traceparent">The trace parent of the message.</param>
/// <param name="tracestate">The trace state of the message.</param>
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
public static bool TryExtractTraceContext(IDictionary<string, object> properties, out string? traceparent, out string? tracestate)
public static bool TryExtractTraceContext(IDictionary<string, object?> properties, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;

if (ActivityExtensions.SupportsActivitySource && properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
{
traceparent = traceParentString;
if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
if (properties.TryGetValue(TraceState, out object? state) && state is string stateString)
{
tracestate = stateString;
}
Expand All @@ -158,7 +158,7 @@ public static bool TryExtractTraceContext(IDictionary<string, object> properties
/// <param name="activityName">The activity name to use for the diagnostic scope.</param>
/// <param name="traceparent">The traceparent that was either added, or that already existed in the message properties.</param>
/// <param name="tracestate">The tracestate that was either added, or that already existed in the message properties.</param>
public void InstrumentMessage(IDictionary<string, object> properties, string activityName, out string? traceparent, out string? tracestate)
public void InstrumentMessage(IDictionary<string, object?> properties, string activityName, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public bool Equals(MessagingDiagnosticOperation other)
return _operation == other._operation;
}

public override bool Equals(object obj)
public override bool Equals(object? obj)
{
return obj is MessagingDiagnosticOperation other && Equals(other);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
using System.Linq;
using System.Net;
using Microsoft.Azure.WebJobs;
#if NET6_0_OR_GREATER
using Microsoft.Azure.WebJobs.Extensions.Rpc;
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc;
#endif
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config;
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
Expand All @@ -27,7 +31,6 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder)
}

builder.AddServiceBus(p => { });

return builder;
}

Expand Down Expand Up @@ -99,11 +102,18 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action
}

configure(options);
});
})
#if NET6_0_OR_GREATER
.MapWorkerGrpcService<SettlementService>()
#endif
;

builder.Services.AddAzureClientsCore();
builder.Services.TryAddSingleton<MessagingProvider>();
builder.Services.AddSingleton<ServiceBusClientFactory>();
#if NET6_0_OR_GREATER
builder.Services.AddSingleton<SettlementService>();
#endif
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#if NET6_0_OR_GREATER
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
{
internal static class SettlementExtensions
{
internal static object GetPropertyValue(this SettlementProperties properties)
{
return properties.ValuesCase switch
{
SettlementProperties.ValuesOneofCase.LongValue => properties.LongValue,
SettlementProperties.ValuesOneofCase.UlongValue => properties.UlongValue,
SettlementProperties.ValuesOneofCase.DoubleValue => properties.DoubleValue,
SettlementProperties.ValuesOneofCase.FloatValue => properties.FloatValue,
SettlementProperties.ValuesOneofCase.IntValue => properties.IntValue,
SettlementProperties.ValuesOneofCase.UintValue => properties.UintValue,
SettlementProperties.ValuesOneofCase.BoolValue => properties.BoolValue,
SettlementProperties.ValuesOneofCase.StringValue => properties.StringValue,
_ => null
};
}
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";

// The settlement service definition.
service Settlement {
// Completes a message
rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {}

// Abandons a message
rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {}

// Deadletters a message
rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {}

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}
}

// The complete message request containing the locktoken.
message CompleteRequest {
string locktoken = 1;
}

// The abandon message request containing the locktoken and properties to modify.
message AbandonRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
}

// The deadletter message request containing the locktoken and properties to modify along with the reason/description.
message DeadletterRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
string deadletterReason = 3;
string deadletterErrorDescription = 4;
}

// The defer message request containing the locktoken and properties to modify.
message DeferRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
}

// The settlement property can be of any type listed below, which
// corresponds to the types specified in
// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
message SettlementProperties {
oneof values {
string stringValue = 1;
int32 intValue = 2;
uint32 uintValue = 3;
int64 longValue = 4;
uint64 ulongValue = 5;
bool boolValue = 6;
float floatValue = 7;
double doubleValue = 8;
google.protobuf.Timestamp timestampValue = 9;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#if NET6_0_OR_GREATER
using System;
using System.Linq;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Azure.ServiceBus.Grpc;
using Microsoft.Azure.WebJobs.ServiceBus;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
{
internal class SettlementService : Settlement.SettlementBase
{
private readonly MessagingProvider _provider;

public SettlementService(MessagingProvider provider)
{
_provider = provider;
}

public SettlementService()
{
_provider = null;
}

public override async Task<Empty> Complete(CompleteRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.CompleteMessageAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Abandon(AbandonRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.AbandonMessageAsync(
tuple.Message,
request.PropertiesToModify.ToDictionary(
pair => pair.Key,
pair => pair.Value.GetPropertyValue()),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Defer(DeferRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.DeferMessageAsync(
tuple.Message,
request.PropertiesToModify.ToDictionary(
pair => pair.Key,
pair => pair.Value.GetPropertyValue()),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Deadletter(DeadletterRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
request.PropertiesToModify.ToDictionary(
pair => pair.Key,
pair => pair.Value.GetPropertyValue()),
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}
}
}
#endif
Loading

0 comments on commit 239c373

Please sign in to comment.