From b543249de3e6a88c04d466ffd68610aefcd7a530 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Dec 2019 16:15:24 +0900 Subject: [PATCH 1/5] fix: Group no longer supports non-void operations. --- .../Server/Hubs/Group.ConcurrentDictionary.cs | 208 +----------------- .../Server/Hubs/Group.ImmutableArray.cs | 143 +----------- 2 files changed, 13 insertions(+), 338 deletions(-) diff --git a/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs b/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs index 6b9ae3580..9d9bb7f4f 100644 --- a/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs +++ b/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs @@ -150,31 +150,7 @@ public Task WriteAllAsync(int methodId, T value, bool fireAndForget) } else { - var rent = ArrayPool.Shared.Rent(approximatelyLength); - var writeCount = 0; - ValueTask promise; - try - { - var buffer = rent; - var index = 0; - foreach (var item in members) - { - if (buffer.Length < index) - { - Array.Resize(ref buffer, buffer.Length * 2); - } - buffer[index++] = WriteInAsyncLock(item.Value, message); - writeCount++; - } - - promise = ToPromise(buffer, index); - } - finally - { - ArrayPool.Shared.Return(rent, true); - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -197,38 +173,7 @@ public Task WriteExceptAsync(int methodId, T value, Guid connectionId, bool f } else { - var rent = ArrayPool.Shared.Rent(approximatelyLength); - var writeCount = 0; - ValueTask promise; - try - { - var buffer = rent; - var index = 0; - foreach (var item in members) - { - if (buffer.Length < index) - { - Array.Resize(ref buffer, buffer.Length * 2); - } - if (item.Value.ContextId == connectionId) - { - buffer[index++] = default(ValueTask); - } - else - { - buffer[index++] = WriteInAsyncLock(item.Value, message); - writeCount++; - } - } - - promise = ToPromise(buffer, index); - } - finally - { - ArrayPool.Shared.Return(rent, true); - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -257,43 +202,7 @@ public Task WriteExceptAsync(int methodId, T value, Guid[] connectionIds, boo } else { - var rent = ArrayPool.Shared.Rent(approximatelyLength); - ValueTask promise; - var writeCount = 0; - try - { - var buffer = rent; - var index = 0; - foreach (var item in members) - { - if (buffer.Length < index) - { - Array.Resize(ref buffer, buffer.Length * 2); - } - - foreach (var item2 in connectionIds) - { - if (item.Value.ContextId == item2) - { - buffer[index++] = default(ValueTask); - goto NEXT; - } - } - buffer[index++] = WriteInAsyncLock(item.Value, message); - writeCount++; - - NEXT: - continue; - } - - promise = ToPromise(buffer, index); - } - finally - { - ArrayPool.Shared.Return(rent, true); - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -321,34 +230,7 @@ public Task WriteToAsync(int methodId, T value, Guid[] connectionIds, bool fi } else { - var rent = ArrayPool.Shared.Rent(connectionIds.Length); - ValueTask promise; - var writeCount = 0; - try - { - var buffer = rent; - var index = 0; - foreach (var item in connectionIds) - { - if (members.TryGetValue(item, out var context)) - { - buffer[index++] = WriteInAsyncLock(context, message); - writeCount++; - } - else - { - buffer[index++] = default(ValueTask); - } - } - - promise = ToPromise(buffer, index); - } - finally - { - ArrayPool.Shared.Return(rent, true); - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -393,58 +275,7 @@ public Task WriteExceptRawAsync(ArraySegment msg, Guid[] exceptConnectionI } else { - var rent = ArrayPool.Shared.Rent(approximatelyLength); - var writeCount = 0; - ValueTask promise; - try - { - var buffer = rent; - var index = 0; - if (exceptConnectionIds == null) - { - foreach (var item in members) - { - if (buffer.Length < index) - { - Array.Resize(ref buffer, buffer.Length * 2); - } - - buffer[index++] = WriteInAsyncLock(item.Value, message); - writeCount++; - } - } - else - { - foreach (var item in members) - { - if (buffer.Length < index) - { - Array.Resize(ref buffer, buffer.Length * 2); - } - - foreach (var item2 in exceptConnectionIds) - { - if (item.Value.ContextId == item2) - { - buffer[index++] = default(ValueTask); - goto NEXT; - } - } - buffer[index++] = WriteInAsyncLock(item.Value, message); - writeCount++; - - NEXT: - continue; - } - } - promise = ToPromise(buffer, index); - } - finally - { - ArrayPool.Shared.Return(rent, true); - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -476,34 +307,7 @@ public Task WriteToRawAsync(ArraySegment msg, Guid[] connectionIds, bool f } else { - var rent = ArrayPool.Shared.Rent(connectionIds.Length); - ValueTask promise; - var writeCount = 0; - try - { - var buffer = rent; - var index = 0; - foreach (var item in connectionIds) - { - if (members.TryGetValue(item, out var context)) - { - buffer[index++] = WriteInAsyncLock(context, message); - writeCount++; - } - else - { - buffer[index++] = default(ValueTask); - } - } - - promise = ToPromise(buffer, index); - } - finally - { - ArrayPool.Shared.Return(rent, true); - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } diff --git a/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs b/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs index 975e13f0a..fcb828963 100644 --- a/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs +++ b/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs @@ -145,13 +145,7 @@ public Task WriteAllAsync(int methodId, T value, bool fireAndForget) } else { - var promise = new ReservedWhenAllPromise(source.Length); - for (int i = 0; i < source.Length; i++) - { - promise.Add(WriteInAsyncLock(source[i], message)); - } - logger.InvokeHubBroadcast(GroupName, message.Length, source.Length); - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -176,23 +170,7 @@ public Task WriteExceptAsync(int methodId, T value, Guid connectionId, bool f } else { - var promise = new ReservedWhenAllPromise(source.Length); - var writeCount = 0; - for (int i = 0; i < source.Length; i++) - { - if (source[i].ContextId == connectionId) - { - promise.Add(default(ValueTask)); - } - else - { - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - } - } - - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -224,27 +202,7 @@ public Task WriteExceptAsync(int methodId, T value, Guid[] connectionIds, boo } else { - var promise = new ReservedWhenAllPromise(source.Length); - var writeCount = 0; - for (int i = 0; i < source.Length; i++) - { - foreach (var item in connectionIds) - { - if (source[i].ContextId == item) - { - promise.Add(default(ValueTask)); - goto NEXT; - } - } - - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - NEXT: - continue; - } - - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -271,19 +229,7 @@ public Task WriteToAsync(int methodId, T value, Guid connectionId, bool fireA } else { - var promise = new ReservedWhenAllPromise(source.Length); - var writeCount = 0; - for (int i = 0; i < source.Length; i++) - { - if (source[i].ContextId == connectionId) - { - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - break; - } - } - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -315,27 +261,7 @@ public Task WriteToAsync(int methodId, T value, Guid[] connectionIds, bool fi } else { - var promise = new ReservedWhenAllPromise(source.Length); - var writeCount = 0; - for (int i = 0; i < source.Length; i++) - { - foreach (var item in connectionIds) - { - if (source[i].ContextId != item) - { - promise.Add(default(ValueTask)); - goto NEXT; - } - } - - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - NEXT: - continue; - } - - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -379,38 +305,7 @@ public Task WriteExceptRawAsync(ArraySegment msg, Guid[] exceptConnectionI } else { - var promise = new ReservedWhenAllPromise(source.Length); - var writeCount = 0; - if (exceptConnectionIds == null) - { - for (int i = 0; i < source.Length; i++) - { - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - } - } - else - { - for (int i = 0; i < source.Length; i++) - { - foreach (var item in exceptConnectionIds) - { - if (source[i].ContextId == item) - { - promise.Add(default(ValueTask)); - goto NEXT; - } - } - - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - NEXT: - continue; - } - } - - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } @@ -447,31 +342,7 @@ public Task WriteToRawAsync(ArraySegment msg, Guid[] connectionIds, bool f } else { - var promise = new ReservedWhenAllPromise(source.Length); - var writeCount = 0; - if (connectionIds != null) - { - for (int i = 0; i < source.Length; i++) - { - foreach (var item in connectionIds) - { - if (source[i].ContextId != item) - { - promise.Add(default(ValueTask)); - goto NEXT; - } - } - - promise.Add(WriteInAsyncLock(source[i], message)); - writeCount++; - NEXT: - continue; - } - - logger.InvokeHubBroadcast(GroupName, message.Length, writeCount); - } - - return promise.AsValueTask().AsTask(); + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); } } From 5d347853baaf9cfa29cab77481e3e83b8ac5174a Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Dec 2019 16:16:58 +0900 Subject: [PATCH 2/5] fix: The process cannot shutdown properly. --- tests/MagicOnion.NetCoreTests/_ServerFixture.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/MagicOnion.NetCoreTests/_ServerFixture.cs b/tests/MagicOnion.NetCoreTests/_ServerFixture.cs index e2ec34071..c52c96781 100644 --- a/tests/MagicOnion.NetCoreTests/_ServerFixture.cs +++ b/tests/MagicOnion.NetCoreTests/_ServerFixture.cs @@ -87,8 +87,9 @@ public TStreamingHub CreateStreamingHubClient(TReceive public void Dispose() { - DefaultChannel.ShutdownAsync().Wait(); - server.ShutdownAsync().Wait(); + try { DefaultChannel.ShutdownAsync().Wait(1000); } catch { } + + try { server.ShutdownAsync().Wait(1000); } catch { } } } From 7e61fc70f143f46d399a36e6a0f7910b95a6cd60 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Dec 2019 16:18:36 +0900 Subject: [PATCH 3/5] feat: Add unit tests for Group/Broadcaster. --- .../BroadcastGroupTestHub.cs | 76 ++++ .../ConcurrentDictionaryGroupTest.cs | 33 ++ .../GroupTestBase .cs | 392 ++++++++++++++++++ .../ImmutableArrayGroupTest.cs | 33 ++ 4 files changed, 534 insertions(+) create mode 100644 tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/BroadcastGroupTestHub.cs create mode 100644 tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ConcurrentDictionaryGroupTest.cs create mode 100644 tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/GroupTestBase .cs create mode 100644 tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ImmutableArrayGroupTest.cs diff --git a/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/BroadcastGroupTestHub.cs b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/BroadcastGroupTestHub.cs new file mode 100644 index 000000000..be9ac6cf7 --- /dev/null +++ b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/BroadcastGroupTestHub.cs @@ -0,0 +1,76 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using MagicOnion.Server.Hubs; + +namespace MagicOnion.NetCoreTests.Tests.StreamingHubBroadcastTest +{ + public class StreamingHubBroadcastTestHubReceiverMock : IStreamingHubBroadcastTestHubReceiver + { + public bool HasCalled { get; private set; } + + public void Call() + { + HasCalled = true; + } + } + + public interface IStreamingHubBroadcastTestHubReceiver + { + void Call(); + } + + public interface IStreamingHubBroadcastTestHub : IStreamingHub + { + Task RegisterConnectionToGroup(); + + Task CallBroadcastToSelfAsync(); + Task CallBroadcastExceptSelfAsync(); + Task CallBroadcastExceptAsync(Guid connectionId); + Task CallBroadcastExceptManyAsync(Guid[] connectionIds); + Task CallBroadcastToAsync(Guid connectionId); + Task CallBroadcastToManyAsync(Guid[] connectionIds); + } + + public class StreamingHubBroadcastTestHub : StreamingHubBase, IStreamingHubBroadcastTestHub + { + IGroup group; + + public async Task RegisterConnectionToGroup() + { + this.group = await this.Group.AddAsync("Nantoka"); + return ConnectionId; + } + + public async Task CallBroadcastToSelfAsync() + { + BroadcastToSelf(group).Call(); + } + + public async Task CallBroadcastExceptSelfAsync() + { + BroadcastExceptSelf(group).Call(); + } + + public async Task CallBroadcastExceptAsync(Guid connectionId) + { + BroadcastExcept(group, connectionId).Call(); + } + + public async Task CallBroadcastExceptManyAsync(Guid[] connectionIds) + { + BroadcastExcept(group, connectionIds).Call(); + } + + public async Task CallBroadcastToAsync(Guid connectionId) + { + BroadcastTo(group, connectionId).Call(); + } + + public async Task CallBroadcastToManyAsync(Guid[] connectionIds) + { + BroadcastTo(group, connectionIds).Call(); + } + } +} diff --git a/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ConcurrentDictionaryGroupTest.cs b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ConcurrentDictionaryGroupTest.cs new file mode 100644 index 000000000..8600f41f2 --- /dev/null +++ b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ConcurrentDictionaryGroupTest.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using MagicOnion.Server; +using MagicOnion.Server.Hubs; +using MagicOnion.Tests; +using Xunit; + +namespace MagicOnion.NetCoreTests.Tests.StreamingHubBroadcastTest +{ + [CollectionDefinition(nameof(StreamingHubBroadcastConcurrentDictionaryGroupTestGrpcServerFixture))] + public class StreamingHubBroadcastConcurrentDictionaryGroupTestGrpcServerFixture : ICollectionFixture + { + public class CustomServerFixture : ServerFixture + { + protected override MagicOnionServiceDefinition BuildServerServiceDefinition(MagicOnionOptions options) + { + options.DefaultGroupRepositoryFactory = new ConcurrentDictionaryGroupRepositoryFactory(); + return MagicOnionEngine.BuildServerServiceDefinition(new[] {typeof(StreamingHubBroadcastTestHub)}, options); + } + } + } + + [Collection(nameof(StreamingHubBroadcastConcurrentDictionaryGroupTestGrpcServerFixture))] + public class ConcurrentDictionaryGroupTest : GroupTestBase + { + public ConcurrentDictionaryGroupTest(StreamingHubBroadcastConcurrentDictionaryGroupTestGrpcServerFixture.CustomServerFixture server) + : base(server) + { + } + } +} diff --git a/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/GroupTestBase .cs b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/GroupTestBase .cs new file mode 100644 index 000000000..d801de734 --- /dev/null +++ b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/GroupTestBase .cs @@ -0,0 +1,392 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using MagicOnion.Server; +using MagicOnion.Server.Hubs; +using MagicOnion.Tests; +using Xunit; + +namespace MagicOnion.NetCoreTests.Tests.StreamingHubBroadcastTest +{ + public abstract class GroupTestBase + { + readonly ServerFixture server; + + public GroupTestBase(ServerFixture server) + { + this.server = server; + } + + [Fact] + public async Task BroadcastToSelf() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToSelfAsync(); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self + Assert.True(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToSelf_2() + { + // NOTE: Register `Non-self` target client at first. + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + await hubOther.RegisterConnectionToGroup(); + + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + await hub.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToSelfAsync(); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self + Assert.True(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExceptSelf() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptSelfAsync(); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Other + Assert.False(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExceptSelf_2() + { + // NOTE: Register `Non-self` target client at first. + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + await hubOther.RegisterConnectionToGroup(); + + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + await hub.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptSelfAsync(); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Other + Assert.False(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExcept_One_1() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptAsync(Guid.NewGuid()); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self, Other + Assert.True(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExcept_One_2() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptAsync(connectionIdOther); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self + Assert.True(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExcept_One_3() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptAsync(connectionId); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Other + Assert.False(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExcept_Many_1() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptManyAsync(new[] { Guid.NewGuid(), Guid.NewGuid() }); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self, Other + Assert.True(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExcept_Many_2() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptManyAsync(new[] { Guid.NewGuid(), connectionIdOther, Guid.NewGuid() }); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self + Assert.True(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastToExcept_Many_3() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastExceptManyAsync(new[] { Guid.NewGuid(), connectionIdOther, Guid.NewGuid(), connectionId, Guid.NewGuid() }); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: None + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + + [Fact] + public async Task BroadcastTo_One_1() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToAsync(Guid.NewGuid()); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: None + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastTo_One_2() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToAsync(connectionId); + + await Task.Delay(100); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Other + Assert.True(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastTo_One_3() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToAsync(connectionIdOther); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Other + Assert.False(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastTo_Many_1() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToManyAsync(new[] { Guid.NewGuid(), Guid.NewGuid() }); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: None + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastTo_Many_2() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToManyAsync(new[] { Guid.NewGuid(), connectionId, Guid.NewGuid() }); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self + Assert.True(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + } + + [Fact] + public async Task BroadcastTo_Many_3() + { + var mockReceiver = new StreamingHubBroadcastTestHubReceiverMock(); + var hub = server.CreateStreamingHubClient(mockReceiver); + var connectionId = await hub.RegisterConnectionToGroup(); + + var mockReceiverOther = new StreamingHubBroadcastTestHubReceiverMock(); + var hubOther = server.CreateStreamingHubClient(mockReceiverOther); + var connectionIdOther = await hubOther.RegisterConnectionToGroup(); + + Assert.False(mockReceiver.HasCalled); + Assert.False(mockReceiverOther.HasCalled); + + await hub.CallBroadcastToManyAsync(new[] { Guid.NewGuid(), connectionId, Guid.NewGuid(), connectionIdOther }); + + await Task.Delay(10); // NOTE: The receivers may not receive broadcast yet at this point. + + // Target: Self, Other + Assert.True(mockReceiver.HasCalled); + Assert.True(mockReceiverOther.HasCalled); + } + } +} diff --git a/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ImmutableArrayGroupTest.cs b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ImmutableArrayGroupTest.cs new file mode 100644 index 000000000..43839152e --- /dev/null +++ b/tests/MagicOnion.NetCoreTests/Tests/StreamingHubBroadcastTest/ImmutableArrayGroupTest.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using MagicOnion.Server; +using MagicOnion.Server.Hubs; +using MagicOnion.Tests; +using Xunit; + +namespace MagicOnion.NetCoreTests.Tests.StreamingHubBroadcastTest +{ + [CollectionDefinition(nameof(StreamingHubBroadcastImmutableArrayGroupTestGrpcServerFixture))] + public class StreamingHubBroadcastImmutableArrayGroupTestGrpcServerFixture : ICollectionFixture + { + public class CustomServerFixture : ServerFixture + { + protected override MagicOnionServiceDefinition BuildServerServiceDefinition(MagicOnionOptions options) + { + options.DefaultGroupRepositoryFactory = new ImmutableArrayGroupRepositoryFactory(); + return MagicOnionEngine.BuildServerServiceDefinition(new[] {typeof(StreamingHubBroadcastTestHub)}, options); + } + } + } + + [Collection(nameof(StreamingHubBroadcastImmutableArrayGroupTestGrpcServerFixture))] + public class ImmutableArrayGroupTest : GroupTestBase + { + public ImmutableArrayGroupTest(StreamingHubBroadcastImmutableArrayGroupTestGrpcServerFixture.CustomServerFixture server) + : base(server) + { + } + } +} From fe76ca0c163c82f1c4c14cf0ba8f1a81a7623399 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Dec 2019 16:25:41 +0900 Subject: [PATCH 4/5] fix: ImmutableArrayGroup doesn't properly broadcast to multiple targets. --- src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs b/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs index fcb828963..75d7c85cd 100644 --- a/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs +++ b/src/MagicOnion/Server/Hubs/Group.ImmutableArray.cs @@ -245,14 +245,14 @@ public Task WriteToAsync(int methodId, T value, Guid[] connectionIds, bool fi { foreach (var item in connectionIds) { - if (source[i].ContextId != item) + if (source[i].ContextId == item) { + WriteInAsyncLockVoid(source[i], message); + writeCount++; goto NEXT; } } - WriteInAsyncLockVoid(source[i], message); - writeCount++; NEXT: continue; } From dc3d855dac5a113fd7644409673b984c681f0e6e Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Dec 2019 16:27:14 +0900 Subject: [PATCH 5/5] fix: ConcurrentDictionaryGroup has no WriteToAsync (single target) implementation. --- .../Server/Hubs/Group.ConcurrentDictionary.cs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs b/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs index 9d9bb7f4f..b1b8439ba 100644 --- a/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs +++ b/src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs @@ -208,7 +208,20 @@ public Task WriteExceptAsync(int methodId, T value, Guid[] connectionIds, boo public Task WriteToAsync(int methodId, T value, Guid connectionId, bool fireAndForget) { - throw new NotImplementedException(); + var message = BuildMessage(methodId, value); + if (fireAndForget) + { + if (members.TryGetValue(connectionId, out var context)) + { + WriteInAsyncLockVoid(context, message); + logger.InvokeHubBroadcast(GroupName, message.Length, 1); + } + return TaskEx.CompletedTask; + } + else + { + throw new NotSupportedException("The write operation must be called with Fire and Forget option"); + } } public Task WriteToAsync(int methodId, T value, Guid[] connectionIds, bool fireAndForget)