Skip to content

Commit

Permalink
Initial suggestion for Global locking for Timed Services for v9
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon committed May 16, 2024
1 parent 14d8c7d commit 6b71011
Show file tree
Hide file tree
Showing 13 changed files with 410 additions and 30 deletions.
26 changes: 26 additions & 0 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.ServiceAc
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.ServiceActivator.Control.Api", "src\Paramore.Brighter.ServiceActivator.Control.Api\Paramore.Brighter.ServiceActivator.Control.Api.csproj", "{397F8496-6916-43EF-AEB2-5D84048DE357}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.Azure", "Paramore.Brighter.Locking.Azure\Paramore.Brighter.Locking.Azure.csproj", "{021F3B51-A640-4C0D-9B47-FB4E32DF6715}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1987,6 +1989,30 @@ Global
{397F8496-6916-43EF-AEB2-5D84048DE357}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{397F8496-6916-43EF-AEB2-5D84048DE357}.Release|x86.ActiveCfg = Release|Any CPU
{397F8496-6916-43EF-AEB2-5D84048DE357}.Release|x86.Build.0 = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Debug|x86.ActiveCfg = Debug|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Debug|x86.Build.0 = Debug|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Any CPU.Build.0 = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.ActiveCfg = Release|Any CPU
{3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.ActiveCfg = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.Build.0 = Debug|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.ActiveCfg = Release|Any CPU
{021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
106 changes: 106 additions & 0 deletions Paramore.Brighter.Locking.Azure/AzureBlobLockingProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter.Locking.Azure;

public class AzureBlobLockingProvider(AzureBlobLockingProviderOptions options) : IDistributedLock
{
private readonly BlobContainerClient _containerClient = new BlobContainerClient(options.BlobContainerUri, options.TokenCredential);
private readonly ILogger _logger = ApplicationLogging.CreateLogger<AzureBlobLockingProviderOptions>();

private readonly Dictionary<string, string> _leases = new Dictionary<string, string>();

public async Task<bool> ObtainLockAsync(string resource, CancellationToken cancellationToken)
{
var client = GetBlobClient(resource);

// Write if does not exist
if (!await client.ExistsAsync(cancellationToken))
{
await using var emptyStream = new MemoryStream();
await using var writer = new StreamWriter(emptyStream);
await writer.WriteAsync(string.Empty);
await writer.FlushAsync(cancellationToken);
emptyStream.Position = 0;
await client.UploadAsync(emptyStream, cancellationToken: cancellationToken);
}

try
{
var response = await client.GetBlobLeaseClient().AcquireAsync(options.LeaseValidity, cancellationToken: cancellationToken);
_leases.Add(NormaliseResourceName(resource), response.Value.LeaseId);
return true;
}
catch (RequestFailedException e)
{
_logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource);
return false;
}
}

public bool ObtainLock(string resource)
{
var client = GetBlobClient(resource);

// Write if does not exist
if (!client.Exists())
{
using var emptyStream = new MemoryStream();
using var writer = new StreamWriter(emptyStream);
writer.Write(string.Empty);
writer.Flush();
emptyStream.Position = 0;
client.Upload(emptyStream);
}

try
{
var response = client.GetBlobLeaseClient().Acquire(options.LeaseValidity);
_leases.Add(NormaliseResourceName(resource), response.Value.LeaseId);
return true;
}
catch (RequestFailedException e)
{
_logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource);
return false;
}
}

public async Task ReleaseLockAsync(string resource, CancellationToken cancellationToken)
{
var client = GetBlobLeaseClientForResource(resource);
if(client == null)
return;
await client.ReleaseAsync(cancellationToken: cancellationToken);
_leases.Remove(NormaliseResourceName(resource));
}

public void ReleaseLock(string resource)
{
var client = GetBlobLeaseClientForResource(resource);
if(client == null)
return;
client.Release();
_leases.Remove(NormaliseResourceName(resource));
}

private BlobLeaseClient? GetBlobLeaseClientForResource(string resource)
{
if (_leases.ContainsKey(NormaliseResourceName(resource)))
return GetBlobClient(resource).GetBlobLeaseClient(_leases[NormaliseResourceName(resource)]);

_logger.LogInformation("No lock found for {LockResourceName}", resource);
return null;
}

private BlobClient GetBlobClient(string resource)
{
var storageLocation = options.StorageLocationFunc.Invoke(NormaliseResourceName(resource));
return _containerClient.GetBlobClient(storageLocation);
}

private static string NormaliseResourceName(string resourceName) => resourceName.ToLower();
}
29 changes: 29 additions & 0 deletions Paramore.Brighter.Locking.Azure/AzureBlobLockingProviderOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Azure.Core;

namespace Paramore.Brighter.Locking.Azure;

public class AzureBlobLockingProviderOptions(
Uri blobContainerUri,
TokenCredential tokenCredential
)
{
/// <summary>
/// The URI of the blob container
/// </summary>
public Uri BlobContainerUri { get; init; } = blobContainerUri;

/// <summary>
/// The Credential to use when writing blobs
/// </summary>
public TokenCredential TokenCredential { get; init; } = tokenCredential;

/// <summary>
/// The amount of time before the lease automatically expires
/// </summary>
public TimeSpan LeaseValidity { get; init; } = TimeSpan.FromMinutes(1);

/// <summary>
/// The function to provide the location to store the locks inside of the Blob container
/// </summary>
public Func<string, string> StorageLocationFunc = (resource) => $"lock-{resource}";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\src\Paramore.Brighter\Paramore.Brighter.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ public static IBrighterBuilder UseExternalBus(this IBrighterBuilder brighterBuil

return brighterBuilder;
}

/// <summary>
/// Use a distributed locking mechanism for processes that can not run in parallel
/// </summary>
/// <param name="brighterBuilder">The Brighter builder to add this option to</param>
/// <param name="distributedLock">The Distributed Lock Provider</param>
/// <returns>The Brighter builder to allow chaining of requests</returns>
public static IBrighterBuilder UseDistributedLock(this IBrighterBuilder brighterBuilder, IDistributedLock distributedLock)
{
brighterBuilder.Services.AddSingleton<IDistributedLock>(distributedLock);

return brighterBuilder;
}

/// <summary>
/// Configure a Feature Switch registry to control handlers to be feature switched at runtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public static IBrighterBuilder UseOutboxSweeper(this IBrighterBuilder brighterBu

brighterBuilder.Services.TryAddSingleton<TimedOutboxSweeperOptions>(options);
brighterBuilder.Services.AddHostedService<TimedOutboxSweeper>();

// If no distributed locking service is added, then add the in memory variant
brighterBuilder.Services.TryAddSingleton<IDistributedLock>(new InMemoryLock());

return brighterBuilder;
}

Expand All @@ -34,6 +38,9 @@ public static IBrighterBuilder UseOutboxArchiver(this IBrighterBuilder brighterB
brighterBuilder.Services.AddSingleton<IAmAnArchiveProvider>(archiveProvider);

brighterBuilder.Services.AddHostedService<TimedOutboxArchiver>();

// If no distributed locking service is added, then add the in memory variant
brighterBuilder.Services.TryAddSingleton<IDistributedLock>(new InMemoryLock());

return brighterBuilder;
}
Expand Down
15 changes: 9 additions & 6 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@ public class TimedOutboxArchiver : IHostedService, IDisposable
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<TimedOutboxSweeper>();
private IAmAnOutbox<Message> _outbox;
private IAmAnArchiveProvider _archiveProvider;
private readonly IDistributedLock _distributedLock;
private Timer _timer;

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

private const string LockingResourceName = "Archiver";
public TimedOutboxArchiver(IAmAnOutbox<Message> outbox, IAmAnArchiveProvider archiveProvider,
TimedOutboxArchiverOptions options)
IDistributedLock distributedLock, TimedOutboxArchiverOptions options)
{
_outbox = outbox;
_archiveProvider = archiveProvider;
_distributedLock = distributedLock;
_options = options;
}

public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is starting.");

_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));
_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero,
TimeSpan.FromSeconds(_options.TimerInterval));

return Task.CompletedTask;
}
Expand All @@ -51,7 +54,7 @@ public void Dispose()

private async Task Archive(object state, CancellationToken cancellationToken)
{
if (await _semaphore.WaitAsync(TimeSpan.Zero, cancellationToken))
if (await _distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken))
{
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");
try
Expand All @@ -69,7 +72,7 @@ private async Task Archive(object state, CancellationToken cancellationToken)
}
finally
{
_semaphore.Release();
await _distributedLock.ReleaseLockAsync(LockingResourceName, cancellationToken);
}

s_logger.LogInformation("Outbox Sweeper sleeping");
Expand Down
60 changes: 36 additions & 24 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ namespace Paramore.Brighter.Extensions.Hosting
public class TimedOutboxSweeper : IHostedService, IDisposable
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IDistributedLock _distributedLock;
private readonly TimedOutboxSweeperOptions _options;
private static readonly ILogger s_logger= ApplicationLogging.CreateLogger<TimedOutboxSweeper>();
private Timer _timer;

public TimedOutboxSweeper (IServiceScopeFactory serviceScopeFactory, TimedOutboxSweeperOptions options)
private const string LockingResourceName = "OutboxSweeper";

public TimedOutboxSweeper(IServiceScopeFactory serviceScopeFactory, IDistributedLock distributedLock,
TimedOutboxSweeperOptions options)
{
_serviceScopeFactory = serviceScopeFactory;
_distributedLock = distributedLock;
_options = options;
}

Expand All @@ -33,33 +37,41 @@ public Task StartAsync(CancellationToken cancellationToken)

private void DoWork(object state)
{
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");

var scope = _serviceScopeFactory.CreateScope();
try
if (_distributedLock.ObtainLock(LockingResourceName))
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();
s_logger.LogInformation("Outbox Sweeper looking for unsent messages");

var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);
var scope = _serviceScopeFactory.CreateScope();
try
{
IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService<IAmACommandProcessor>();

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
var outBoxSweeper = new OutboxSweeper(
millisecondsSinceSent: _options.MinimumMessageAge,
commandProcessor: commandProcessor,
_options.BatchSize,
_options.UseBulk,
_options.Args);

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
else
outBoxSweeper.Sweep();
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
}
finally
{
_distributedLock.ReleaseLock(LockingResourceName);
scope.Dispose();
}
}
finally
else
{
scope.Dispose();
s_logger.LogWarning("Outbox Sweeper is still running - abandoning attempt.");
}

s_logger.LogInformation("Outbox Sweeper sleeping");
Expand Down
Loading

0 comments on commit 6b71011

Please sign in to comment.