From 15bf90c0306eeb9bdce42dfe16d74cdf277dc349 Mon Sep 17 00:00:00 2001 From: aaronwuus Date: Sun, 28 Feb 2021 00:59:38 -0600 Subject: [PATCH] after several attempts, subject, arraypool, ToBase64... no obverious improvement for fixing memiry spikes, looks this is a known issue for bytestring. bytesring is not designed for disposeble, refer to: https://github.com/protocolbuffers/protobuf/issues/4206 --- StreamServer/Hubs/StreamingHub.cs | 221 +++++++++++++++++++++++++++++- StreamServer/wwwroot/js/site.js | 2 +- 2 files changed, 221 insertions(+), 2 deletions(-) diff --git a/StreamServer/Hubs/StreamingHub.cs b/StreamServer/Hubs/StreamingHub.cs index 87b28b5..be537e0 100644 --- a/StreamServer/Hubs/StreamingHub.cs +++ b/StreamServer/Hubs/StreamingHub.cs @@ -5,6 +5,7 @@ using Streamer; using StreamServer.Models; using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; @@ -52,9 +53,10 @@ public async Task Subscribe(int index, int x, int y, int w, int h, bool stop) var subscription = StreamerSubject.Subscribe( async x => { - if(x.Index == index) + if (x.Index == index) { var base64Data = Convert.ToBase64String(x.Image.ToByteArray()); + //var base64Data = x.Image.ToBase64(); var imgData = "data:image/gif;base64," + base64Data; await Clients.Caller.SendAsync("ReceiveImage", imgData); } @@ -83,6 +85,221 @@ public async Task Subscribe(int index, int x, int y, int w, int h, bool stop) } } }); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + while (!RoomHasSubscriber.ContainsKey(index)) + { + await Task.Delay(1000); + } + await Task.Delay(-1); + } + + private static readonly Dictionary> ImagePool = new Dictionary>(); + public async Task SubscribePool(int index, int x, int y, int w, int h, bool stop) + { + var sb = new StringBuilder(); + var subscription = StreamerSubject.Subscribe( + async x => + { + if (x.Index == index) + { + //var base64Data = Convert.ToBase64String(x.Image.ToByteArray()); + //var base64Data = x.Image.ToBase64(); + //var length = x.Image.Length; + //var arr = ImagePool[index].Rent(length); + //x.Image.CopyTo(arr, 0); + //var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr); + //await Clients.Caller.SendAsync("ReceiveImage", imgData); + //ImagePool[index].Return(arr); + await Clients.Caller.SendAsync("ReceiveImage", x.ImageString); + } + }, + () => Console.WriteLine("Sequence Completed.")); + if (!RoomHasSubscriber.ContainsKey(index)) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(async () => + { + using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler })) + { + var request = new StreamRequest(); + request.X = x; + request.Y = y; + request.W = w; + request.H = h; + var client = new Greeter.GreeterClient(channel); + CancellationTokenSource cts = new CancellationTokenSource(); + var streamServer = client.StreamingServer(request, cancellationToken: cts.Token); + RoomHasSubscriber[index] = true; + ImagePool[index] = ArrayPool.Create(); + while (await streamServer.ResponseStream.MoveNext(cts.Token)) + { + var img = streamServer.ResponseStream.Current.Image; + var length = img.Length; + var arr = ImagePool[index].Rent(length); + img.CopyTo(arr, 0); + var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr); + ImagePool[index].Return(arr); + StreamerSubject.OnNext(new Streamer { Index = index, ImageString = imgData }); + } + } + }); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + while (!RoomHasSubscriber.ContainsKey(index)) + { + await Task.Delay(1000); + } + await Task.Delay(-1); + } + public async Task SubscribeAttempt_v1(int index, int x, int y, int w, int h, bool stop) + { + var sb = new StringBuilder(); + var subscription = StreamerSubject.Subscribe( + async x => + { + if (x.Index == index) + { + //var base64Data = Convert.ToBase64String(x.Image.ToByteArray()); + //var base64Data = x.Image.ToBase64(); + //var length = x.Image.Length; + //var arr = ImagePool[index].Rent(length); + //x.Image.CopyTo(arr, 0); + //var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr); + //await Clients.Caller.SendAsync("ReceiveImage", imgData); + //ImagePool[index].Return(arr); + await Clients.Caller.SendAsync("ReceiveImage", x.ImageString); + } + }, + () => Console.WriteLine("Sequence Completed.")); + if (!RoomHasSubscriber.ContainsKey(index)) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(async () => + { + using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler })) + { + var request = new StreamRequest(); + request.X = x; + request.Y = y; + request.W = w; + request.H = h; + var client = new Greeter.GreeterClient(channel); + CancellationTokenSource cts = new CancellationTokenSource(); + var streamServer = client.StreamingServer(request, cancellationToken: cts.Token); + RoomHasSubscriber[index] = true; + ImagePool[index] = ArrayPool.Create(); + while (await streamServer.ResponseStream.MoveNext(cts.Token)) + { + var img = streamServer.ResponseStream.Current.Image; + var length = img.Length; + var arr = ImagePool[index].Rent(length); + img.CopyTo(arr, 0); + var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr); + ImagePool[index].Return(arr); + StreamerSubject.OnNext(new Streamer { Index = index, ImageString = imgData }); + } + } + }); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + while (!RoomHasSubscriber.ContainsKey(index)) + { + await Task.Delay(1000); + } + await Task.Delay(-1); + } + public async Task SubscribeAttempt_v2(int index, int x, int y, int w, int h, bool stop) + { + var subscription = StreamerSubject.Subscribe( + async x => + { + if (x.Index == index) + { + var base64Data = Convert.ToBase64String(x.Image.ToByteArray()); + //var base64Data = x.Image.ToBase64(); // even worse than ToByteArray + var length = x.Image.Length; + var arr = ImagePool[index].Rent(length); + x.Image.CopyTo(arr, 0); + var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr); + await Clients.Caller.SendAsync("ReceiveImage", imgData); + ImagePool[index].Return(arr); + } + }, + () => Console.WriteLine("Sequence Completed.")); + if (!RoomHasSubscriber.ContainsKey(index)) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(async () => + { + using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler })) + { + var request = new StreamRequest(); + request.X = x; + request.Y = y; + request.W = w; + request.H = h; + var client = new Greeter.GreeterClient(channel); + CancellationTokenSource cts = new CancellationTokenSource(); + var streamServer = client.StreamingServer(request, cancellationToken: cts.Token); + RoomHasSubscriber[index] = true; + ImagePool[index] = ArrayPool.Create(); + while (await streamServer.ResponseStream.MoveNext(cts.Token)) + { + StreamerSubject.OnNext(new Streamer { Index = index, Image = streamServer.ResponseStream.Current.Image}); + } + } + }); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + while (!RoomHasSubscriber.ContainsKey(index)) + { + await Task.Delay(1000); + } + await Task.Delay(-1); + } + public async Task SubscribeAttempt_v3(int index, int x, int y, int w, int h, bool stop) + { + var sb = new StringBuilder(); + var subscription = StreamerSubject.Subscribe( + async x => + { + if (x.Index == index) + { + var base64Data = Convert.ToBase64String(x.Image.ToByteArray()); + //var base64Data = x.Image.ToBase64(); + var length = x.Image.Length; + var arr = ImagePool[index].Rent(length); + x.Image.CopyTo(arr, 0); + var imgData = "data:image/gif;base64," + Convert.ToBase64String(arr); + await Clients.Caller.SendAsync("ReceiveImage", imgData); + ImagePool[index].Return(arr); + } + }, + () => Console.WriteLine("Sequence Completed.")); + if (!RoomHasSubscriber.ContainsKey(index)) + { +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run(async () => + { + using (var channel = GrpcChannel.ForAddress("https://localhost:55555", new GrpcChannelOptions { HttpHandler = httpHandler })) + { + var request = new StreamRequest(); + request.X = x; + request.Y = y; + request.W = w; + request.H = h; + var client = new Greeter.GreeterClient(channel); + CancellationTokenSource cts = new CancellationTokenSource(); + var streamServer = client.StreamingServer(request, cancellationToken: cts.Token); + RoomHasSubscriber[index] = true; + ImagePool[index] = ArrayPool.Create(); + while (await streamServer.ResponseStream.MoveNext(cts.Token)) + { + StreamerSubject.OnNext(new Streamer { Index = index, Image = streamServer.ResponseStream.Current.Image }); + } + } + }); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } while (!RoomHasSubscriber.ContainsKey(index)) @@ -92,9 +309,11 @@ public async Task Subscribe(int index, int x, int y, int w, int h, bool stop) await Task.Delay(-1); } } + public class Streamer { public int Index { get; set; } public ByteString Image { get; set; } + public string ImageString { get; set; } } } diff --git a/StreamServer/wwwroot/js/site.js b/StreamServer/wwwroot/js/site.js index 6a1a588..e234165 100644 --- a/StreamServer/wwwroot/js/site.js +++ b/StreamServer/wwwroot/js/site.js @@ -17,7 +17,7 @@ document.getElementById("start").addEventListener("click", function (event) { var y = document.getElementById("y").value; var w = document.getElementById("w").value; var h = document.getElementById("h").value; - connection.invoke("Subscribe", parseInt(i), parseInt(x), parseInt(y), parseInt(w),parseInt(h),false).catch(function (err) { + connection.invoke("SubscribePool", parseInt(i), parseInt(x), parseInt(y), parseInt(w),parseInt(h),false).catch(function (err) { return console.error(err.toString()); }); event.preventDefault();