Skip to content

Commit

Permalink
Merge pull request #238 from Cysharp/hotfix/GroupBroadcast
Browse files Browse the repository at this point in the history
ImmutableArrayGroup and ConcurrentDictionaryGroup don't work as expected.
  • Loading branch information
mayuki authored Dec 13, 2019
2 parents 14bdca9 + dc3d855 commit acb442a
Show file tree
Hide file tree
Showing 7 changed files with 567 additions and 344 deletions.
223 changes: 20 additions & 203 deletions src/MagicOnion/Server/Hubs/Group.ConcurrentDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,31 +150,7 @@ public Task WriteAllAsync<T>(int methodId, T value, bool fireAndForget)
}
else
{
var rent = ArrayPool<ValueTask>.Shared.Rent(approximatelyLength);
var writeCount = 0;
ValueTask promise;
try
{
var buffer = rent;
var index = 0;
foreach (var item in members)
{
if (buffer.Length < index)
{
Array.Resize(ref buffer, buffer.Length * 2);
}
buffer[index++] = WriteInAsyncLock(item.Value, message);
writeCount++;
}

promise = ToPromise(buffer, index);
}
finally
{
ArrayPool<ValueTask>.Shared.Return(rent, true);
}
logger.InvokeHubBroadcast(GroupName, message.Length, writeCount);
return promise.AsTask();
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

Expand All @@ -197,38 +173,7 @@ public Task WriteExceptAsync<T>(int methodId, T value, Guid connectionId, bool f
}
else
{
var rent = ArrayPool<ValueTask>.Shared.Rent(approximatelyLength);
var writeCount = 0;
ValueTask promise;
try
{
var buffer = rent;
var index = 0;
foreach (var item in members)
{
if (buffer.Length < index)
{
Array.Resize(ref buffer, buffer.Length * 2);
}
if (item.Value.ContextId == connectionId)
{
buffer[index++] = default(ValueTask);
}
else
{
buffer[index++] = WriteInAsyncLock(item.Value, message);
writeCount++;
}
}

promise = ToPromise(buffer, index);
}
finally
{
ArrayPool<ValueTask>.Shared.Return(rent, true);
}
logger.InvokeHubBroadcast(GroupName, message.Length, writeCount);
return promise.AsTask();
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

Expand Down Expand Up @@ -257,49 +202,26 @@ public Task WriteExceptAsync<T>(int methodId, T value, Guid[] connectionIds, boo
}
else
{
var rent = ArrayPool<ValueTask>.Shared.Rent(approximatelyLength);
ValueTask promise;
var writeCount = 0;
try
{
var buffer = rent;
var index = 0;
foreach (var item in members)
{
if (buffer.Length < index)
{
Array.Resize(ref buffer, buffer.Length * 2);
}

foreach (var item2 in connectionIds)
{
if (item.Value.ContextId == item2)
{
buffer[index++] = default(ValueTask);
goto NEXT;
}
}
buffer[index++] = WriteInAsyncLock(item.Value, message);
writeCount++;

NEXT:
continue;
}

promise = ToPromise(buffer, index);
}
finally
{
ArrayPool<ValueTask>.Shared.Return(rent, true);
}
logger.InvokeHubBroadcast(GroupName, message.Length, writeCount);
return promise.AsTask();
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

public Task WriteToAsync<T>(int methodId, T value, Guid connectionId, bool fireAndForget)
{
throw new NotImplementedException();
var message = BuildMessage(methodId, value);
if (fireAndForget)
{
if (members.TryGetValue(connectionId, out var context))
{
WriteInAsyncLockVoid(context, message);
logger.InvokeHubBroadcast(GroupName, message.Length, 1);
}
return TaskEx.CompletedTask;
}
else
{
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

public Task WriteToAsync<T>(int methodId, T value, Guid[] connectionIds, bool fireAndForget)
Expand All @@ -321,34 +243,7 @@ public Task WriteToAsync<T>(int methodId, T value, Guid[] connectionIds, bool fi
}
else
{
var rent = ArrayPool<ValueTask>.Shared.Rent(connectionIds.Length);
ValueTask promise;
var writeCount = 0;
try
{
var buffer = rent;
var index = 0;
foreach (var item in connectionIds)
{
if (members.TryGetValue(item, out var context))
{
buffer[index++] = WriteInAsyncLock(context, message);
writeCount++;
}
else
{
buffer[index++] = default(ValueTask);
}
}

promise = ToPromise(buffer, index);
}
finally
{
ArrayPool<ValueTask>.Shared.Return(rent, true);
}
logger.InvokeHubBroadcast(GroupName, message.Length, writeCount);
return promise.AsTask();
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

Expand Down Expand Up @@ -393,58 +288,7 @@ public Task WriteExceptRawAsync(ArraySegment<byte> msg, Guid[] exceptConnectionI
}
else
{
var rent = ArrayPool<ValueTask>.Shared.Rent(approximatelyLength);
var writeCount = 0;
ValueTask promise;
try
{
var buffer = rent;
var index = 0;
if (exceptConnectionIds == null)
{
foreach (var item in members)
{
if (buffer.Length < index)
{
Array.Resize(ref buffer, buffer.Length * 2);
}

buffer[index++] = WriteInAsyncLock(item.Value, message);
writeCount++;
}
}
else
{
foreach (var item in members)
{
if (buffer.Length < index)
{
Array.Resize(ref buffer, buffer.Length * 2);
}

foreach (var item2 in exceptConnectionIds)
{
if (item.Value.ContextId == item2)
{
buffer[index++] = default(ValueTask);
goto NEXT;
}
}
buffer[index++] = WriteInAsyncLock(item.Value, message);
writeCount++;

NEXT:
continue;
}
}
promise = ToPromise(buffer, index);
}
finally
{
ArrayPool<ValueTask>.Shared.Return(rent, true);
}
logger.InvokeHubBroadcast(GroupName, message.Length, writeCount);
return promise.AsTask();
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

Expand Down Expand Up @@ -476,34 +320,7 @@ public Task WriteToRawAsync(ArraySegment<byte> msg, Guid[] connectionIds, bool f
}
else
{
var rent = ArrayPool<ValueTask>.Shared.Rent(connectionIds.Length);
ValueTask promise;
var writeCount = 0;
try
{
var buffer = rent;
var index = 0;
foreach (var item in connectionIds)
{
if (members.TryGetValue(item, out var context))
{
buffer[index++] = WriteInAsyncLock(context, message);
writeCount++;
}
else
{
buffer[index++] = default(ValueTask);
}
}

promise = ToPromise(buffer, index);
}
finally
{
ArrayPool<ValueTask>.Shared.Return(rent, true);
}
logger.InvokeHubBroadcast(GroupName, message.Length, writeCount);
return promise.AsTask();
throw new NotSupportedException("The write operation must be called with Fire and Forget option");
}
}

Expand Down
Loading

0 comments on commit acb442a

Please sign in to comment.