Skip to content

Commit

Permalink
Improved closing logic websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
JKorf committed Aug 25, 2024
1 parent 93e4722 commit 3e6bdaa
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static class CryptoExchangeWebSocketClientLoggingExtension
private static readonly Action<ILogger, int, string, Exception?> _sendLoopStoppedWithException;
private static readonly Action<ILogger, int, Exception?> _sendLoopFinished;
private static readonly Action<ILogger, int, string, string ,Exception?> _receivedCloseMessage;
private static readonly Action<ILogger, int, string, string ,Exception?> _receivedCloseConfirmation;
private static readonly Action<ILogger, int, int, Exception?> _receivedPartialMessage;
private static readonly Action<ILogger, int, int, Exception?> _receivedSingleMessage;
private static readonly Action<ILogger, int, long, Exception?> _reassembledMessage;
Expand All @@ -33,6 +34,7 @@ public static class CryptoExchangeWebSocketClientLoggingExtension
private static readonly Action<ILogger, int, Exception?> _receiveLoopFinished;
private static readonly Action<ILogger, int, TimeSpan?, Exception?> _startingTaskForNoDataReceivedCheck;
private static readonly Action<ILogger, int, TimeSpan?, Exception?> _noDataReceiveTimoutReconnect;
private static readonly Action<ILogger, int, string, string, Exception?> _socketProcessingStateChanged;

static CryptoExchangeWebSocketClientLoggingExtension()
{
Expand Down Expand Up @@ -170,6 +172,17 @@ static CryptoExchangeWebSocketClientLoggingExtension()
LogLevel.Debug,
new EventId(1027, "NoDataReceiveTimeoutReconnect"),
"[Sckt {SocketId}] no data received for {Timeout}, reconnecting socket");

_receivedCloseConfirmation = LoggerMessage.Define<int, string, string>(
LogLevel.Debug,
new EventId(1028, "ReceivedCloseMessage"),
"[Sckt {SocketId}] received `Close` message confirming our close request, CloseStatus: {CloseStatus}, CloseStatusDescription: {CloseStatusDescription}");

_socketProcessingStateChanged = LoggerMessage.Define<int, string, string>(
LogLevel.Trace,
new EventId(1028, "SocketProcessingStateChanged"),
"[Sckt {Id}] processing state change: {PreviousState} -> {NewState}");

}

public static void SocketConnecting(
Expand Down Expand Up @@ -286,6 +299,12 @@ public static void SocketReceivedCloseMessage(
_receivedCloseMessage(logger, socketId, webSocketCloseStatus, closeStatusDescription, null);
}

public static void SocketReceivedCloseConfirmation(
this ILogger logger, int socketId, string webSocketCloseStatus, string closeStatusDescription)
{
_receivedCloseConfirmation(logger, socketId, webSocketCloseStatus, closeStatusDescription, null);
}

public static void SocketReceivedPartialMessage(
this ILogger logger, int socketId, int countBytes)
{
Expand Down Expand Up @@ -333,5 +352,11 @@ public static void SocketNoDataReceiveTimoutReconnect(
{
_noDataReceiveTimoutReconnect(logger, socketId, timeSpan, null);
}

public static void SocketProcessingStateChanged(
this ILogger logger, int socketId, string prevState, string newState)
{
_socketProcessingStateChanged(logger, socketId, prevState, newState, null);
}
}
}
79 changes: 50 additions & 29 deletions CryptoExchange.Net/Sockets/CryptoExchangeWebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ private async Task ProcessAsync()
while (!_stopRequested)
{
_logger.SocketStartingProcessing(Id);
_processState = ProcessState.Processing;
SetProcessState(ProcessState.Processing);
var sendTask = SendLoopAsync();
var receiveTask = ReceiveLoopAsync();
var timeoutTask = Parameters.Timeout != null && Parameters.Timeout > TimeSpan.FromSeconds(0) ? CheckTimeoutAsync() : Task.CompletedTask;
await Task.WhenAll(sendTask, receiveTask, timeoutTask).ConfigureAwait(false);
_logger.SocketFinishedProcessing(Id);

_processState = ProcessState.WaitingForClose;
SetProcessState(ProcessState.WaitingForClose);
while (_closeTask == null)
await Task.Delay(50).ConfigureAwait(false);

Expand All @@ -250,14 +250,14 @@ private async Task ProcessAsync()

if (Parameters.ReconnectPolicy == ReconnectPolicy.Disabled)
{
_processState = ProcessState.Idle;
SetProcessState(ProcessState.Idle);
await (OnClose?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
return;
}

if (!_stopRequested)
{
_processState = ProcessState.Reconnecting;
SetProcessState(ProcessState.Reconnecting);
await (OnReconnecting?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
}

Expand Down Expand Up @@ -296,12 +296,15 @@ private async Task ProcessAsync()

_reconnectAttempt = 0;
_lastReconnectTime = DateTime.UtcNow;

// Set to processing before reconnect handling
SetProcessState(ProcessState.Processing);
await (OnReconnected?.Invoke() ?? Task.CompletedTask).ConfigureAwait(false);
break;
}
}

_processState = ProcessState.Idle;
SetProcessState(ProcessState.Idle);
}

private TimeSpan GetReconnectDelay()
Expand Down Expand Up @@ -391,34 +394,33 @@ private async Task CloseInternalAsync()
if (_disposed)
return;

_ctsSource.Cancel();

if (_socket.State == WebSocketState.Open)
try
{
try
if (_socket.State == WebSocketState.CloseReceived)
{
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch (Exception)
else if (_socket.State == WebSocketState.Open)
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
var startWait = DateTime.UtcNow;
while (_socket.State != WebSocketState.Closed && _socket.State != WebSocketState.Aborted)
{
// Wait until we receive close confirmation
await Task.Delay(10).ConfigureAwait(false);
if (DateTime.UtcNow - startWait > TimeSpan.FromSeconds(5))
break; // Wait for max 5 seconds, then just abort the connection
}
}
}
else if(_socket.State == WebSocketState.CloseReceived)
catch (Exception)
{
try
{
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", default).ConfigureAwait(false);
}
catch (Exception)
{
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}
// Can sometimes throw an exception when socket is in aborted state due to timing
// Websocket is set to Aborted state when the cancelation token is set during SendAsync/ReceiveAsync
// So socket might go to aborted state, might still be open
}

_ctsSource.Cancel();
}

/// <summary>
Expand Down Expand Up @@ -565,10 +567,20 @@ private async Task ReceiveLoopAsync()

if (receiveResult.MessageType == WebSocketMessageType.Close)
{
// Connection closed unexpectedly
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString(), receiveResult.CloseStatusDescription);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
// Connection closed
if (_socket.State == WebSocketState.CloseReceived)
{
// Close received means it server initiated, we should send a confirmation and close the socket
_logger.SocketReceivedCloseMessage(Id, receiveResult.CloseStatus.ToString(), receiveResult.CloseStatusDescription);
if (_closeTask?.IsCompleted != false)
_closeTask = CloseInternalAsync();
}
else
{
// Means the socket is now closed and we were the one initiating it
_logger.SocketReceivedCloseConfirmation(Id, receiveResult.CloseStatus.ToString(), receiveResult.CloseStatusDescription);
}

break;
}

Expand Down Expand Up @@ -758,6 +770,15 @@ protected virtual void SetProxy(ClientWebSocket socket, ApiProxy proxy)
if (proxy.Login != null)
socket.Options.Proxy.Credentials = new NetworkCredential(proxy.Login, proxy.Password);
}

private void SetProcessState(ProcessState state)
{
if (_processState == state)
return;

_logger.SocketProcessingStateChanged(Id, _processState.ToString(), state.ToString());
_processState = state;
}
}

/// <summary>
Expand Down

0 comments on commit 3e6bdaa

Please sign in to comment.