diff --git a/src/HouseofCat.RabbitMQ/Services/Extensions/RabbitServiceExtensions.cs b/src/HouseofCat.RabbitMQ/Services/Extensions/RabbitServiceExtensions.cs index 9db1f34..a3ee1ea 100644 --- a/src/HouseofCat.RabbitMQ/Services/Extensions/RabbitServiceExtensions.cs +++ b/src/HouseofCat.RabbitMQ/Services/Extensions/RabbitServiceExtensions.cs @@ -103,7 +103,7 @@ public static async Task BuildRabbitServiceAsync( compressionProvider, loggerFactory); - await rabbitService.Publisher.StartAutoPublishAsync(); + await rabbitService.StartAsync(); return rabbitService; } diff --git a/src/HouseofCat.RabbitMQ/Services/RabbitService.cs b/src/HouseofCat.RabbitMQ/Services/RabbitService.cs index 37391a5..cbf59c4 100644 --- a/src/HouseofCat.RabbitMQ/Services/RabbitService.cs +++ b/src/HouseofCat.RabbitMQ/Services/RabbitService.cs @@ -77,38 +77,18 @@ public RabbitService( CompressionProvider = compressionProvider; } - public RabbitService( - string fileNamePath, - ISerializationProvider serializationProvider, - IEncryptionProvider encryptionProvider = null, - ICompressionProvider compressionProvider = null, - ILoggerFactory loggerFactory = null, - Func processReceiptAsync = null) - : this( - JsonFileReader - .ReadFileAsync(fileNamePath) - .GetAwaiter() - .GetResult(), - serializationProvider, - encryptionProvider, - compressionProvider, - loggerFactory, - processReceiptAsync) - { } - public RabbitService( RabbitOptions options, ISerializationProvider serializationProvider, IEncryptionProvider encryptionProvider = null, ICompressionProvider compressionProvider = null, - ILoggerFactory loggerFactory = null, - Func processReceiptAsync = null) : this( + ILoggerFactory loggerFactory = null) + : this( new ChannelPool(options), serializationProvider, encryptionProvider, compressionProvider, - loggerFactory, - processReceiptAsync) + loggerFactory) { } public RabbitService( @@ -116,8 +96,7 @@ public RabbitService( ISerializationProvider serializationProvider, IEncryptionProvider encryptionProvider = null, ICompressionProvider compressionProvider = null, - ILoggerFactory loggerFactory = null, - Func processReceiptAsync = null) + ILoggerFactory loggerFactory = null) { Guard.AgainstNull(chanPool, nameof(chanPool)); Guard.AgainstNull(serializationProvider, nameof(serializationProvider)); @@ -136,14 +115,20 @@ public RabbitService( }; Topologer = new Topologer(ChannelPool); + } - BuildConsumers(); - - Publisher.StartAutoPublish(processReceiptAsync); + public async Task StartAsync(Func processReceiptAsync = null) + { + if (await _serviceLock.WaitAsync(0).ConfigureAwait(false)) return; - BuildConsumerTopologyAsync() - .GetAwaiter() - .GetResult(); + try + { + BuildConsumers(); + await BuildConsumerTopologyAsync(); + Publisher.StartAutoPublish(processReceiptAsync); + } + finally + { _serviceLock.Release(); } } public async ValueTask ShutdownAsync(bool immediately)