Skip to content

Commit

Permalink
Support infinite lock renewal (#38754)
Browse files Browse the repository at this point in the history
* Support infinite lock renewal

* woops

* lower timeout
  • Loading branch information
JoshLove-msft authored Sep 15, 2023
1 parent 0a79630 commit 99cc57c
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ internal class ReceiverManager
protected readonly ServiceBusProcessorOptions ProcessorOptions;
private readonly MessagingClientDiagnostics _clientDiagnostics;

protected bool AutoRenewLock => ProcessorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero;
protected bool AutoRenewLock => ProcessorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero ||
ProcessorOptions.MaxAutoLockRenewalDuration == Timeout.InfiniteTimeSpan;

public ReceiverManager(
ServiceBusProcessor processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.ComponentModel;
using System.Threading;
using Azure.Core;

namespace Azure.Messaging.ServiceBus
Expand Down Expand Up @@ -62,6 +63,7 @@ public int PrefetchCount
/// <summary>
/// Gets or sets the maximum duration within which the lock will be renewed automatically. This
/// value should be greater than the longest message lock duration; for example, the LockDuration Property.
/// To specify an infinite duration, use <see cref="Timeout.InfiniteTimeSpan"/>.
/// </summary>
///
/// <value>The maximum duration during which message locks are automatically renewed. The default value is 5 minutes.</value>
Expand All @@ -77,7 +79,10 @@ public TimeSpan MaxAutoLockRenewalDuration

set
{
Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration));
if (value != Timeout.InfiniteTimeSpan)
{
Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration));
}
_maxAutoRenewDuration = value;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading;
using Azure.Core;

namespace Azure.Messaging.ServiceBus
Expand Down Expand Up @@ -51,7 +52,7 @@ public int PrefetchCount

/// <summary>
/// Gets or sets the maximum duration within which the session lock will be renewed automatically. This value
/// should be greater than the queue's LockDuration Property.
/// should be greater than the queue's LockDuration Property. To specify an infinite duration, use <see cref="Timeout.InfiniteTimeSpan"/>.
/// </summary>
///
/// <value>The maximum duration during which session locks are automatically renewed. The default value is 5 minutes.</value>
Expand All @@ -66,7 +67,10 @@ public TimeSpan MaxAutoLockRenewalDuration

set
{
Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration));
if (value != Timeout.InfiniteTimeSpan)
{
Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration));
}
_maxAutoRenewDuration = value;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void ProcessorOptionsValidation()
options.PrefetchCount = 0;
options.MaxReceiveWaitTime = TimeSpan.FromSeconds(1);
options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(0);
options.MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan;
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public void ProcessorOptionsValidation()
options.PrefetchCount = 0;
options.SessionIdleTimeout = TimeSpan.FromSeconds(1);
options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(0);
options.MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan;
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus;
Expand Down Expand Up @@ -73,14 +74,20 @@ public ServiceBusRetryOptions ClientRetryOptions
/// Gets or sets the maximum duration within which the lock will be renewed automatically. This
/// value should be greater than the longest message lock duration; for example, the LockDuration Property.
/// The default value is 5 minutes. This does not apply for functions that receive a batch of messages.
/// To specify an infinite duration, use <see cref="Timeout.InfiniteTimeSpan"/> or <value>-00:00:00.0010000</value>
/// if specifying via host.json.
/// </summary>
public TimeSpan MaxAutoLockRenewalDuration
{
get => _maxAutoRenewDuration;

set
{
Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration));
if (value != Timeout.InfiniteTimeSpan)
{
Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration));
}

_maxAutoRenewDuration = value;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@
<Compile Include="$(AzureCoreSharedSources)MessagingDiagnosticOperation.cs" LinkBase="Shared" />
</ItemGroup>

<!--Temp - remove after next release-->
<ItemGroup>
<ProjectReference Include="..\..\Azure.Messaging.ServiceBus\src\Azure.Messaging.ServiceBus.csproj" />
</ItemGroup>
<!--End Temp-->
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
Expand Down Expand Up @@ -99,6 +100,32 @@ public void ConfigureOptions_Format_Returns_Expected()
Assert.AreEqual(10, result.ClientRetryOptions.MaxRetries);
}

[Test]
public void ConfigureOptions_InfiniteTimeSpans_Format_Returns_Expected()
{
ServiceBusOptions options = CreateOptionsFromConfigInfiniteTimeSpans();
JObject jObject = new JObject
{
{ ExtensionPath, JObject.Parse(((IOptionsFormatter)options).Format()) }
};

ServiceBusOptions result = TestHelpers.GetConfiguredOptions<ServiceBusOptions>(
b =>
{
b.AddServiceBus();
},
jsonStream: new BinaryData(jObject.ToString()).ToStream());

Assert.AreEqual(123, result.PrefetchCount);

Assert.AreEqual(123, result.MaxConcurrentCalls);
Assert.False(result.AutoCompleteMessages);
Assert.AreEqual(Timeout.InfiniteTimeSpan, result.MaxAutoLockRenewalDuration);
Assert.AreEqual(Timeout.InfiniteTimeSpan, result.MaxBatchWaitTime);
Assert.AreEqual("http://proxyserver:8080/", ((WebProxy)result.WebProxy).Address.AbsoluteUri);
Assert.AreEqual(10, result.ClientRetryOptions.MaxRetries);
}

private static ServiceBusOptions CreateOptionsFromConfig()
{
var values = new Dictionary<string, string>
Expand All @@ -124,6 +151,31 @@ private static ServiceBusOptions CreateOptionsFromConfig()
return options;
}

private static ServiceBusOptions CreateOptionsFromConfigInfiniteTimeSpans()
{
var values = new Dictionary<string, string>
{
{ $"{ExtensionPath}:PrefetchCount", "123" },
{ $"ConnectionStrings:ServiceBus", "TestConnectionString" },
{ $"{ExtensionPath}:MaxConcurrentCalls", "123" },
{ $"{ExtensionPath}:AutoCompleteMessages", "false" },
{ $"{ExtensionPath}:MaxAutoLockRenewalDuration", "-00:00:00.0010000" },
{ $"{ExtensionPath}:MaxConcurrentSessions", "123" },
{ $"{ExtensionPath}:TransportType", "AmqpWebSockets" },
{ $"{ExtensionPath}:MaxMessageBatchSize", "20" },
{ $"{ExtensionPath}:MinMessageBatchSize", "10" },
{ $"{ExtensionPath}:MaxBatchWaitTime", "-00:00:00.0010000" },
{ $"{ExtensionPath}:WebProxy", "http://proxyserver:8080/" },
{ $"{ExtensionPath}:ClientRetryOptions:MaxRetries", "10" },
};

ServiceBusOptions options = TestHelpers.GetConfiguredOptions<ServiceBusOptions>(b =>
{
b.AddServiceBus();
}, values);
return options;
}

private static ServiceBusOptions CreateOptionsFromConfigBackCompat()
{
var values = new Dictionary<string, string>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ public void ToProcessorOptions_ReturnsExpectedValue()
Assert.AreEqual(sbOptions.MaxConcurrentCalls, processorOptions.MaxConcurrentCalls);
}

[Test]
public void ToProcessorOptions_InfiniteTimeSpans_ReturnsExpectedValue()
{
ServiceBusOptions sbOptions = new ServiceBusOptions
{
AutoCompleteMessages = false,
PrefetchCount = 123,
MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan,
SessionIdleTimeout = Timeout.InfiniteTimeSpan,
MaxConcurrentCalls = 123
};

ServiceBusProcessorOptions processorOptions = sbOptions.ToProcessorOptions(true, false);
Assert.AreEqual(true, processorOptions.AutoCompleteMessages);
Assert.AreEqual(sbOptions.PrefetchCount, processorOptions.PrefetchCount);
Assert.AreEqual(sbOptions.MaxAutoLockRenewalDuration, processorOptions.MaxAutoLockRenewalDuration);
Assert.AreEqual(sbOptions.MaxConcurrentCalls, processorOptions.MaxConcurrentCalls);
}

[Test]
[Category("DynamicConcurrency")]
public void ToProcessorOptions_DynamicConcurrencyEnabled_ReturnsExpectedValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using Azure.Core.Shared;
using Azure.Core.Tests;
using Azure.Messaging.ServiceBus.Diagnostics;
using Microsoft.Extensions.Configuration;
using Constants = Microsoft.Azure.WebJobs.ServiceBus.Constants;

namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
Expand Down Expand Up @@ -242,6 +243,22 @@ public async Task TestSingle_AutoCompleteEnabledOnTrigger_CompleteInFunction()
}
}

[Test]
public async Task TestSingle_InfiniteLockRenewal()
{
await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}");
var host = BuildHost<TestSingleInfiniteLockRenewal>(
SetInfiniteLockRenewal);
using (host)
{
bool result = _waitHandle1.WaitOne(SBTimeoutMills);
Assert.True(result);
await host.StopAsync();
var logs = host.GetTestLoggerProvider().GetAllLogMessages();
Assert.IsNotEmpty(logs.Where(message => message.FormattedMessage.Contains("RenewMessageLock")));
}
}

[Test]
public async Task TestSingle_Dispose()
{
Expand Down Expand Up @@ -829,6 +846,14 @@ private async Task TestSingleDrainMode<T>(bool sendToQueue)
sbOptions.AutoCompleteMessages = false;
}));

private static Action<IHostBuilder> SetInfiniteLockRenewal =>
builder =>
builder.ConfigureAppConfiguration(b =>
b.AddInMemoryCollection(new Dictionary<string, string>
{
{ "AzureWebJobs:Extensions:ServiceBus:MaxAutoLockRenewalDuration", "-00:00:00.0010000" },
}));

private static Action<IHostBuilder> BuildDrainHost<T>()
{
return builder =>
Expand Down Expand Up @@ -1632,6 +1657,19 @@ public static async Task RunAsync(
}
}

public class TestSingleInfiniteLockRenewal
{
public static async Task RunAsync(
[ServiceBusTrigger(FirstQueueNameKey)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
// wait long enough to trigger lock renewal
await Task.Delay(TimeSpan.FromSeconds(20));
_waitHandle1.Set();
}
}

public class TestSingleDispose
{
public static async Task RunAsync(
Expand Down

0 comments on commit 99cc57c

Please sign in to comment.