Skip to content

Commit

Permalink
after several attempts, subject, arraypool, ToBase64... no obverious …
Browse files Browse the repository at this point in the history
…improvement for fixing memiry spikes, looks this is a known issue for bytestring. bytesring is not designed for disposeble, refer to: protocolbuffers/protobuf#4206
  • Loading branch information
aaronwuus committed Feb 28, 2021
1 parent 9bc10cf commit 15bf90c
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 2 deletions.
221 changes: 220 additions & 1 deletion StreamServer/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Streamer;
using StreamServer.Models;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -52,9 +53,10 @@ public async Task Subscribe(int index, int x, int y, int w, int h, bool stop)
var subscription = StreamerSubject.Subscribe(
async x =>
{
if(x.Index == index)
if (x.Index == index)
{
var base64Data = Convert.ToBase64String(x.Image.ToByteArray());
//var base64Data = x.Image.ToBase64();
var imgData = "data:image/gif;base64," + base64Data;
await Clients.Caller.SendAsync("ReceiveImage", imgData);
}
Expand Down Expand Up @@ -83,6 +85,221 @@ public async Task Subscribe(int index, int x, int y, int w, int h, bool stop)
}
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
while (!RoomHasSubscriber.ContainsKey(index))
{
await Task.Delay(1000);
}
await Task.Delay(-1);
}

private static readonly Dictionary<int, ArrayPool<byte>> ImagePool = new Dictionary<int, ArrayPool<byte>>();
public async Task SubscribePool(int index, int x, int y, int w, int h, bool stop)
{
var sb = new StringBuilder();
var subscription = StreamerSubject.Subscribe(
async x =>
{
if (x.Index == index)
{
//var base64Data = Convert.ToBase64String(x.Image.ToByteArray());
//var base64Data = x.Image.ToBase64();
//var length = x.Image.Length;
//var arr = ImagePool[index].Rent(length);
//x.Image.CopyTo(arr, 0);
//var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr);
//await Clients.Caller.SendAsync("ReceiveImage", imgData);
//ImagePool[index].Return(arr);
await Clients.Caller.SendAsync("ReceiveImage", x.ImageString);
}
},
() => Console.WriteLine("Sequence Completed."));
if (!RoomHasSubscriber.ContainsKey(index))
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler }))
{
var request = new StreamRequest();
request.X = x;
request.Y = y;
request.W = w;
request.H = h;
var client = new Greeter.GreeterClient(channel);
CancellationTokenSource cts = new CancellationTokenSource();
var streamServer = client.StreamingServer(request, cancellationToken: cts.Token);
RoomHasSubscriber[index] = true;
ImagePool[index] = ArrayPool<byte>.Create();
while (await streamServer.ResponseStream.MoveNext(cts.Token))
{
var img = streamServer.ResponseStream.Current.Image;
var length = img.Length;
var arr = ImagePool[index].Rent(length);
img.CopyTo(arr, 0);
var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr);
ImagePool[index].Return(arr);
StreamerSubject.OnNext(new Streamer { Index = index, ImageString = imgData });
}
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
while (!RoomHasSubscriber.ContainsKey(index))
{
await Task.Delay(1000);
}
await Task.Delay(-1);
}
public async Task SubscribeAttempt_v1(int index, int x, int y, int w, int h, bool stop)
{
var sb = new StringBuilder();
var subscription = StreamerSubject.Subscribe(
async x =>
{
if (x.Index == index)
{
//var base64Data = Convert.ToBase64String(x.Image.ToByteArray());
//var base64Data = x.Image.ToBase64();
//var length = x.Image.Length;
//var arr = ImagePool[index].Rent(length);
//x.Image.CopyTo(arr, 0);
//var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr);
//await Clients.Caller.SendAsync("ReceiveImage", imgData);
//ImagePool[index].Return(arr);
await Clients.Caller.SendAsync("ReceiveImage", x.ImageString);
}
},
() => Console.WriteLine("Sequence Completed."));
if (!RoomHasSubscriber.ContainsKey(index))
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler }))
{
var request = new StreamRequest();
request.X = x;
request.Y = y;
request.W = w;
request.H = h;
var client = new Greeter.GreeterClient(channel);
CancellationTokenSource cts = new CancellationTokenSource();
var streamServer = client.StreamingServer(request, cancellationToken: cts.Token);
RoomHasSubscriber[index] = true;
ImagePool[index] = ArrayPool<byte>.Create();
while (await streamServer.ResponseStream.MoveNext(cts.Token))
{
var img = streamServer.ResponseStream.Current.Image;
var length = img.Length;
var arr = ImagePool[index].Rent(length);
img.CopyTo(arr, 0);
var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr);
ImagePool[index].Return(arr);
StreamerSubject.OnNext(new Streamer { Index = index, ImageString = imgData });
}
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
while (!RoomHasSubscriber.ContainsKey(index))
{
await Task.Delay(1000);
}
await Task.Delay(-1);
}
public async Task SubscribeAttempt_v2(int index, int x, int y, int w, int h, bool stop)
{
var subscription = StreamerSubject.Subscribe(
async x =>
{
if (x.Index == index)
{
var base64Data = Convert.ToBase64String(x.Image.ToByteArray());
//var base64Data = x.Image.ToBase64(); // even worse than ToByteArray
var length = x.Image.Length;
var arr = ImagePool[index].Rent(length);
x.Image.CopyTo(arr, 0);
var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr);
await Clients.Caller.SendAsync("ReceiveImage", imgData);
ImagePool[index].Return(arr);
}
},
() => Console.WriteLine("Sequence Completed."));
if (!RoomHasSubscriber.ContainsKey(index))
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler }))
{
var request = new StreamRequest();
request.X = x;
request.Y = y;
request.W = w;
request.H = h;
var client = new Greeter.GreeterClient(channel);
CancellationTokenSource cts = new CancellationTokenSource();
var streamServer = client.StreamingServer(request, cancellationToken: cts.Token);
RoomHasSubscriber[index] = true;
ImagePool[index] = ArrayPool<byte>.Create();
while (await streamServer.ResponseStream.MoveNext(cts.Token))
{
StreamerSubject.OnNext(new Streamer { Index = index, Image = streamServer.ResponseStream.Current.Image});
}
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
while (!RoomHasSubscriber.ContainsKey(index))
{
await Task.Delay(1000);
}
await Task.Delay(-1);
}
public async Task SubscribeAttempt_v3(int index, int x, int y, int w, int h, bool stop)
{
var sb = new StringBuilder();
var subscription = StreamerSubject.Subscribe(
async x =>
{
if (x.Index == index)
{
var base64Data = Convert.ToBase64String(x.Image.ToByteArray());
//var base64Data = x.Image.ToBase64();
var length = x.Image.Length;
var arr = ImagePool[index].Rent(length);
x.Image.CopyTo(arr, 0);
var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr);
await Clients.Caller.SendAsync("ReceiveImage", imgData);
ImagePool[index].Return(arr);
}
},
() => Console.WriteLine("Sequence Completed."));
if (!RoomHasSubscriber.ContainsKey(index))
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
Task.Run(async () =>
{
using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler }))
{
var request = new StreamRequest();
request.X = x;
request.Y = y;
request.W = w;
request.H = h;
var client = new Greeter.GreeterClient(channel);
CancellationTokenSource cts = new CancellationTokenSource();
var streamServer = client.StreamingServer(request, cancellationToken: cts.Token);
RoomHasSubscriber[index] = true;
ImagePool[index] = ArrayPool<byte>.Create();
while (await streamServer.ResponseStream.MoveNext(cts.Token))
{
StreamerSubject.OnNext(new Streamer { Index = index, Image = streamServer.ResponseStream.Current.Image });
}
}
});
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
while (!RoomHasSubscriber.ContainsKey(index))
Expand All @@ -92,9 +309,11 @@ public async Task Subscribe(int index, int x, int y, int w, int h, bool stop)
await Task.Delay(-1);
}
}

public class Streamer
{
public int Index { get; set; }
public ByteString Image { get; set; }
public string ImageString { get; set; }
}
}
2 changes: 1 addition & 1 deletion StreamServer/wwwroot/js/site.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ document.getElementById("start").addEventListener("click", function (event) {
var y = document.getElementById("y").value;
var w = document.getElementById("w").value;
var h = document.getElementById("h").value;
connection.invoke("Subscribe", parseInt(i), parseInt(x), parseInt(y), parseInt(w),parseInt(h),false).catch(function (err) {
connection.invoke("SubscribePool", parseInt(i), parseInt(x), parseInt(y), parseInt(w),parseInt(h),false).catch(function (err) {
return console.error(err.toString());
});
event.preventDefault();
Expand Down

0 comments on commit 15bf90c

Please sign in to comment.