diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 84781cbad..239b25aa8 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,11 +8,11 @@ Current package versions: ## Unreleased -No pending changes. +- Fix [#2400](https://github.com/StackExchange/StackExchange.Redis/issues/2400): Expose `ChannelMessageQueue` as `IAsyncEnumerable` ([#2402 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2402)) ## 2.6.96 -- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script paramters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351)) +- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script parameters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351)) - Fix [#2362](https://github.com/StackExchange/StackExchange.Redis/issues/2362): Set `RedisConnectionException.FailureType` to `AuthenticationFailure` on all authentication scenarios for better handling ([#2367 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2367)) - Fix [#2368](https://github.com/StackExchange/StackExchange.Redis/issues/2368): Support `RedisValue.Length()` for all storage types ([#2370 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2370)) - Fix [#2376](https://github.com/StackExchange/StackExchange.Redis/issues/2376): Avoid a (rare) deadlock scenario ([#2378 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2378)) diff --git a/src/StackExchange.Redis/ChannelMessageQueue.cs b/src/StackExchange.Redis/ChannelMessageQueue.cs index 14af669ef..435b067fc 100644 --- a/src/StackExchange.Redis/ChannelMessageQueue.cs +++ b/src/StackExchange.Redis/ChannelMessageQueue.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Generic; using System.Reflection; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -66,7 +68,7 @@ internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in R /// To create a ChannelMessageQueue, use /// or . /// - public sealed class ChannelMessageQueue + public sealed class ChannelMessageQueue : IAsyncEnumerable { private readonly Channel _queue; /// @@ -319,10 +321,7 @@ internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = Comm { var parent = _parent; _parent = null; - if (parent != null) - { - parent.UnsubscribeAsync(Channel, null, this, flags); - } + parent?.UnsubscribeAsync(Channel, null, this, flags); _queue.Writer.TryComplete(error); } @@ -348,5 +347,22 @@ internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags f /// /// The flags to use when unsubscribing. public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags); + + /// +#if NETCOREAPP3_0_OR_GREATER + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); +#else + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + while (_queue.Reader.TryRead(out var item)) + { + yield return item; + } + } + } +#endif } } diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 72ae963c1..986eab9c6 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -79,6 +79,7 @@ StackExchange.Redis.ChannelMessage.SubscriptionChannel.get -> StackExchange.Redi StackExchange.Redis.ChannelMessageQueue StackExchange.Redis.ChannelMessageQueue.Channel.get -> StackExchange.Redis.RedisChannel StackExchange.Redis.ChannelMessageQueue.Completion.get -> System.Threading.Tasks.Task! +StackExchange.Redis.ChannelMessageQueue.GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerator! StackExchange.Redis.ChannelMessageQueue.OnMessage(System.Action! handler) -> void StackExchange.Redis.ChannelMessageQueue.OnMessage(System.Func! handler) -> void StackExchange.Redis.ChannelMessageQueue.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask diff --git a/tests/StackExchange.Redis.Tests/PubSubTests.cs b/tests/StackExchange.Redis.Tests/PubSubTests.cs index 91df2aa99..b5287d018 100644 --- a/tests/StackExchange.Redis.Tests/PubSubTests.cs +++ b/tests/StackExchange.Redis.Tests/PubSubTests.cs @@ -308,6 +308,40 @@ private void TestMassivePublish(ISubscriber sub, string channel, string caption) Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds + 3000, caption); } + [Fact] + public async Task SubscribeAsyncEnumerable() + { + using var conn = Create(syncTimeout: 20000, shared: false, log: Writer); + + var sub = conn.GetSubscriber(); + RedisChannel channel = Me(); + + const int TO_SEND = 5; + var gotall = new TaskCompletionSource(); + + var source = await sub.SubscribeAsync(channel); + var op = Task.Run(async () => { + int count = 0; + await foreach (var item in source) + { + count++; + if (count == TO_SEND) gotall.TrySetResult(count); + } + return count; + }); + + for (int i = 0; i < TO_SEND; i++) + { + await sub.PublishAsync(channel, i); + } + await gotall.Task.WithTimeout(5000); + + // check the enumerator exits cleanly + sub.Unsubscribe(channel); + var count = await op.WithTimeout(1000); + Assert.Equal(5, count); + } + [Fact] public async Task PubSubGetAllAnyOrder() {