diff --git a/Brokerages/BrokerageMultiWebSocketSubscriptionManager.cs b/Brokerages/BrokerageMultiWebSocketSubscriptionManager.cs index 2f39c3c38a33..814cfeabf1c6 100644 --- a/Brokerages/BrokerageMultiWebSocketSubscriptionManager.cs +++ b/Brokerages/BrokerageMultiWebSocketSubscriptionManager.cs @@ -77,7 +77,8 @@ public BrokerageMultiWebSocketSubscriptionManager( _subscribeFunc = subscribeFunc; _unsubscribeFunc = unsubscribeFunc; _messageHandler = messageHandler; - _connectionRateLimiter = connectionRateLimiter; + // let's use a reasonable default, no API will like to get DOS on reconnections. 50 WS will take 120s + _connectionRateLimiter = connectionRateLimiter ?? new RateGate(5, TimeSpan.FromSeconds(12)); if (_maximumWebSocketConnections > 0) { @@ -99,25 +100,26 @@ public BrokerageMultiWebSocketSubscriptionManager( }; _reconnectTimer.Elapsed += (_, _) => { - Log.Trace("BrokerageMultiWebSocketSubscriptionManager(): Restarting websocket connections"); - + List webSocketEntries; lock (_locker) { - foreach (var entry in _webSocketEntries) + // let's make a copy so we don't hold the lock + webSocketEntries = _webSocketEntries.ToList(); + } + + Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Restarting {webSocketEntries.Count} websocket connections"); + + Parallel.ForEach(webSocketEntries, new ParallelOptions { MaxDegreeOfParallelism = 4 }, entry => + { + if (entry.WebSocket.IsOpen) { - if (entry.WebSocket.IsOpen) - { - Task.Factory.StartNew(() => - { - Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})"); - Disconnect(entry.WebSocket); - - Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})"); - Connect(entry.WebSocket); - }); - } + Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})"); + Disconnect(entry.WebSocket); + + Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})"); + Connect(entry.WebSocket); } - } + }); }; _reconnectTimer.Start(); @@ -285,10 +287,7 @@ private void Connect(IWebSocket webSocket) webSocket.Open += onOpenAction; - if (_connectionRateLimiter is { IsRateLimited: false }) - { - _connectionRateLimiter.WaitToProceed(); - } + _connectionRateLimiter.WaitToProceed(); try { @@ -331,6 +330,7 @@ private void OnOpen(object sender, EventArgs e) _subscribeFunc(webSocket, symbol); } }); + break; } } } diff --git a/Brokerages/WebSocketClientWrapper.cs b/Brokerages/WebSocketClientWrapper.cs index 8d85d50903b2..9f0b1fb62e1f 100644 --- a/Brokerages/WebSocketClientWrapper.cs +++ b/Brokerages/WebSocketClientWrapper.cs @@ -18,6 +18,7 @@ using System; using System.IO; using System.Net.WebSockets; +using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -120,6 +121,8 @@ public void Close() { try { + _cts?.Cancel(); + try { _client?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", _cts.Token).SynchronouslyAwaitTask(); @@ -129,8 +132,6 @@ public void Close() // ignored } - _cts?.Cancel(); - _taskConnect?.Wait(TimeSpan.FromSeconds(5)); _cts.DisposeSafely(); @@ -253,28 +254,39 @@ private void HandleConnection() } } catch (OperationCanceledException) { } + catch (ObjectDisposedException) { } catch (WebSocketException ex) { - OnError(new WebSocketError(ex.Message, ex)); - connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError); + if (!connectionCts.IsCancellationRequested) + { + OnError(new WebSocketError(ex.Message, ex)); + connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError); - // increase wait time until a maximum value. This is useful during brokerage down times - waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError); + // increase wait time until a maximum value. This is useful during brokerage down times + waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError); + } } catch (Exception ex) { - OnError(new WebSocketError(ex.Message, ex)); + if (!connectionCts.IsCancellationRequested) + { + OnError(new WebSocketError(ex.Message, ex)); + } + } + + if (!connectionCts.IsCancellationRequested) + { + connectionCts.Cancel(); } - connectionCts.Cancel(); } } } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private MessageData ReceiveMessage( - WebSocket webSocket, + ClientWebSocket webSocket, CancellationToken ct, - byte[] receiveBuffer, - long maxSize = long.MaxValue) + byte[] receiveBuffer) { var buffer = new ArraySegment(receiveBuffer); @@ -286,10 +298,6 @@ private MessageData ReceiveMessage( { result = webSocket.ReceiveAsync(buffer, ct).SynchronouslyAwaitTask(); ms.Write(buffer.Array, buffer.Offset, result.Count); - if (ms.Length > maxSize) - { - throw new InvalidOperationException($"Maximum size of the message was exceeded: {_url}"); - } } while (!result.EndOfMessage);