Skip to content

Commit

Permalink
fix(protocols): consume server queues on start-up
Browse files Browse the repository at this point in the history
  • Loading branch information
thomashilzendegen authored and gingters committed Jun 13, 2024
1 parent f9fb100 commit bf26e84
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Thinktecture.Relay.Acknowledgement;
Expand Down Expand Up @@ -44,14 +45,18 @@ public static class RelayServerBuilderExtensions

if (useServerRouting)
{
builder.Services
.TryAddSingleton<IServerTransport<TResponse, TAcknowledge>, ServerTransport<TResponse, TAcknowledge>>();
builder.Services.AddSingleton<ServerTransport<TResponse, TAcknowledge>>();
builder.Services.AddHostedService(provider
=> provider.GetRequiredService<ServerTransport<TResponse, TAcknowledge>>());

builder.Services.AddSingleton<IServerTransport<TResponse, TAcknowledge>>(provider
=> provider.GetRequiredService<ServerTransport<TResponse, TAcknowledge>>());
}

if (useTenantRouting)
{
builder.Services.TryAddSingleton<ITenantTransport<TRequest>, TenantTransport<TRequest, TAcknowledge>>();
builder.Services.TryAddSingleton<ITenantHandlerFactory, TenantHandlerFactory<TRequest, TAcknowledge>>();
builder.Services.AddSingleton<ITenantTransport<TRequest>, TenantTransport<TRequest, TAcknowledge>>();
builder.Services.AddSingleton<ITenantHandlerFactory, TenantHandlerFactory<TRequest, TAcknowledge>>();
}

builder.Services.TryAddSingleton<IConnection>(provider =>
Expand Down
51 changes: 33 additions & 18 deletions src/Thinktecture.Relay.Server.Protocols.RabbitMq/ServerTransport.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
Expand All @@ -12,19 +14,22 @@
namespace Thinktecture.Relay.Server.Protocols.RabbitMq;

/// <inheritdoc cref="IServerTransport{TResponse,TAcknowledge}"/>
public partial class ServerTransport<TResponse, TAcknowledge> : IServerTransport<TResponse, TAcknowledge>, IDisposable
public partial class ServerTransport<TResponse, TAcknowledge> : IServerTransport<TResponse, TAcknowledge>,
IHostedService, IDisposable
where TResponse : ITargetResponse
where TAcknowledge : IAcknowledgeRequest
{
private readonly IModel _acknowledgeConsumeModel;
private readonly DisposableConsumer _acknowledgeConsumer;
private readonly IAcknowledgeCoordinator<TAcknowledge> _acknowledgeCoordinator;
private readonly IModel _acknowledgeDispatchModel;
private readonly ILogger _logger;
private readonly IModel _responseConsumeModel;
private readonly DisposableConsumer _responseConsumer;
private readonly IResponseCoordinator<TResponse> _responseCoordinator;
private readonly IModel _responseDispatchModel;
private readonly Guid _originId;

private DisposableConsumer? _responseConsumer;
private DisposableConsumer? _acknowledgeConsumer;

/// <inheritdoc />
public int? BinarySizeThreshold { get; }
Expand Down Expand Up @@ -55,16 +60,10 @@ public ServerTransport(ILogger<ServerTransport<TResponse, TAcknowledge>> logger,

_responseDispatchModel = modelFactory.Create("response dispatcher");
_acknowledgeDispatchModel = modelFactory.Create("acknowledge dispatcher");

_responseConsumeModel = modelFactory.Create("response handler");
_responseConsumer = new DisposableConsumer(_logger, _responseConsumeModel,
$"{Constants.ResponseQueuePrefix} {relayServerContext.OriginId}");
_responseConsumer.Consume(ResponseConsumerReceivedAsync);

_acknowledgeConsumeModel = modelFactory.Create("acknowledge handler");
_acknowledgeConsumer = new DisposableConsumer(_logger, _acknowledgeConsumeModel,
$"{Constants.AcknowledgeQueuePrefix} {relayServerContext.OriginId}");
_acknowledgeConsumer.Consume(AcknowledgeConsumerReceivedAsync);

_originId = relayServerContext.OriginId;
}

/// <inheritdoc />
Expand All @@ -73,8 +72,8 @@ public void Dispose()
_responseDispatchModel.Dispose();
_acknowledgeDispatchModel.Dispose();

_responseConsumer.Dispose();
_acknowledgeConsumer.Dispose();
_responseConsumer?.Dispose();
_acknowledgeConsumer?.Dispose();

_responseConsumeModel.Dispose();
_acknowledgeConsumeModel.Dispose();
Expand All @@ -84,9 +83,7 @@ public void Dispose()
public async Task DispatchResponseAsync(TResponse response)
{
await _responseDispatchModel.PublishJsonAsync($"{Constants.ResponseQueuePrefix} {response.RequestOriginId}",
response,
durable: false,
persistent: false);
response, durable: false, persistent: false);
Log.DispatchedResponse(_logger, response.RequestId, response.RequestOriginId);
}

Expand All @@ -95,8 +92,8 @@ public async Task DispatchAcknowledgeAsync(TAcknowledge request)
{
Log.DispatchingAcknowledge(_logger, request);

await _acknowledgeDispatchModel.PublishJsonAsync($"{Constants.AcknowledgeQueuePrefix} {request.OriginId}", request,
durable: false, persistent: false);
await _acknowledgeDispatchModel.PublishJsonAsync($"{Constants.AcknowledgeQueuePrefix} {request.OriginId}",
request, durable: false, persistent: false);
Log.DispatchedAcknowledge(_logger, request.RequestId, request.OriginId);
}

Expand All @@ -116,4 +113,22 @@ private async Task AcknowledgeConsumerReceivedAsync(BasicDeliverEventArgs @event
Log.AcknowledgeConsumed(_logger, request.RequestId, @event.RoutingKey, @event.ConsumerTag);
await _acknowledgeCoordinator.ProcessAcknowledgeAsync(request);
}

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken)
{
_responseConsumer = new DisposableConsumer(_logger, _responseConsumeModel,
$"{Constants.ResponseQueuePrefix} {_originId}");
_responseConsumer.Consume(ResponseConsumerReceivedAsync);

_acknowledgeConsumer = new DisposableConsumer(_logger, _acknowledgeConsumeModel,
$"{Constants.AcknowledgeQueuePrefix} {_originId}");
_acknowledgeConsumer.Consume(AcknowledgeConsumerReceivedAsync);

return Task.CompletedTask;
}

/// <inheritdoc />
public Task StopAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
</ItemGroup>
Expand Down

0 comments on commit bf26e84

Please sign in to comment.