diff --git a/src/RawRabbit/Consumer/Queueing/QueueingBaiscConsumerFactory.cs b/src/RawRabbit/Consumer/Queueing/QueueingBaiscConsumerFactory.cs deleted file mode 100644 index cddc8ffd..00000000 --- a/src/RawRabbit/Consumer/Queueing/QueueingBaiscConsumerFactory.cs +++ /dev/null @@ -1,53 +0,0 @@ -using System.Collections.Concurrent; -using System.Threading.Tasks; -using RabbitMQ.Client; -using RawRabbit.Channel; -using RawRabbit.Channel.Abstraction; -using RawRabbit.Common; -using RawRabbit.Configuration.Respond; -using RawRabbit.Consumer.Abstraction; - -namespace RawRabbit.Consumer.Queueing -{ - public class QueueingBaiscConsumerFactory : IConsumerFactory - { - private readonly ConcurrentBag _consumers; - - public QueueingBaiscConsumerFactory(IChannelFactory channelFactory) - { - _consumers = new ConcurrentBag(); - } - - public IRawConsumer CreateConsumer(IConsumerConfiguration cfg, IModel channel) - { - ConfigureQos(channel, cfg.PrefetchCount); - var consumer = new QueueingRawConsumer(channel); - _consumers.Add(consumer); - - //TODO: Add exception handling etc. - - Task - .Run(() => consumer.Queue.Dequeue()) - .ContinueWith(argsTask => consumer.OnMessageAsync(this, argsTask.Result)); - - return consumer; - } - - protected void ConfigureQos(IModel channel, ushort prefetchCount) - { - channel.BasicQos( - prefetchSize: 0, - prefetchCount: prefetchCount, - global: false - ); - } - - public void Dispose() - { - foreach (var consumer in _consumers) - { - consumer?.Disconnect(); - } - } - } -} diff --git a/src/RawRabbit/Consumer/Queueing/QueueingRawConsumer.cs b/src/RawRabbit/Consumer/Queueing/QueueingRawConsumer.cs deleted file mode 100644 index 7c678c3f..00000000 --- a/src/RawRabbit/Consumer/Queueing/QueueingRawConsumer.cs +++ /dev/null @@ -1,34 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Exceptions; -using RawRabbit.Consumer.Abstraction; - -namespace RawRabbit.Consumer.Queueing -{ - class QueueingRawConsumer : QueueingBasicConsumer, IRawConsumer - { - public QueueingRawConsumer(IModel channel) : base(channel) - { - NackedDeliveryTags = new List(); - } - - public Func OnMessageAsync { get; set; } - - public List NackedDeliveryTags { get; } - - public void Disconnect() - { - try - { - Model.BasicCancel(ConsumerTag); - } - catch (AlreadyClosedException) - { - // Perfect, someone allready closed this. - } - } - } -} \ No newline at end of file diff --git a/test/RawRabbit.IntegrationTests/SimpleUse/RpcTest.cs b/test/RawRabbit.IntegrationTests/SimpleUse/RpcTest.cs index 9324465f..66373af7 100644 --- a/test/RawRabbit.IntegrationTests/SimpleUse/RpcTest.cs +++ b/test/RawRabbit.IntegrationTests/SimpleUse/RpcTest.cs @@ -3,10 +3,6 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using RawRabbit.Configuration; -using RawRabbit.Consumer.Abstraction; -using RawRabbit.Consumer.Queueing; using RawRabbit.IntegrationTests.TestMessages; using RawRabbit.vNext; using Xunit; @@ -146,42 +142,6 @@ public async Task Should_Successfully_Perform_Nested_Requests() Assert.Equal(expected: payload, actual: response.Infered); } - [Fact] - public async Task Should_Work_With_Queueing_Consumer_Factory() - { - /* Setup */ - var response = new BasicResponse { Prop = "This is the response." }; - var requester = BusClientFactory.CreateDefault(); - - var responder = BusClientFactory.CreateDefault(service => service.AddTransient()); - responder.RespondAsync((req, i) => Task.FromResult(response)); - - /* Test */ - var recieved = await requester.RequestAsync(); - - /* Assert */ - Assert.Equal(expected: response.Prop, actual: recieved.Prop); - } - - [Fact] - public async Task Should_Work_With_Different_Request_Types_For_Same_Responder() - { - /* Setup */ - var requester = BusClientFactory.CreateDefault(); - var responder = BusClientFactory.CreateDefault(service => service.AddTransient()); - - responder.RespondAsync((req, i) => Task.FromResult(new FirstResponse())); - responder.RespondAsync((req, i) => Task.FromResult(new SecondResponse())); - - /* Test */ - var firstResponse = await requester.RequestAsync(); - var secondResponse = await requester.RequestAsync(); - - /* Assert */ - Assert.NotNull(firstResponse); - Assert.NotNull(secondResponse); - } - [Fact] public async Task Should_Work_When_Not_Awaiting_One_Response_At_A_Time() {