Skip to content

Commit

Permalink
Refactors how Slic keeps connections alive (#3670)
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier authored Oct 4, 2023
1 parent e4338ad commit 2dd4008
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
// Copyright (c) ZeroC, Inc.

using IceRpc.Transports;
using System.Buffers;
using System.Diagnostics;

namespace IceRpc.Transports.Internal;
namespace IceRpc.Internal;

/// <summary>Decorates <see cref="ReadAsync" /> to fail if no byte is received for over readIdleTimeout. Also decorates
/// <see cref="WriteAsync" /> to schedule a keep alive action (writeIdleTimeout / 2) after a successful write. Both
/// sides of the connection are expected to use the same idle timeouts.</summary>
internal class IdleTimeoutDuplexConnectionDecorator : IDuplexConnection
internal class IceDuplexConnectionDecorator : IDuplexConnection
{
private readonly IDuplexConnection _decoratee;
private Timer? _keepAliveTimer;
private readonly Timer _writerTimer;
private readonly CancellationTokenSource _readCts = new();
private TimeSpan _readIdleTimeout = Timeout.InfiniteTimeSpan;
private TimeSpan _writeIdleTimeout = Timeout.InfiniteTimeSpan;
private readonly TimeSpan _readIdleTimeout;
private readonly TimeSpan _writeIdleTimeout;

public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
_decoratee.ConnectAsync(cancellationToken);
public async Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
{
TransportConnectionInformation connectionInformation = await _decoratee.ConnectAsync(cancellationToken)
.ConfigureAwait(false);

// Schedule or reschedule a keep alive after a successful connection establishment.
ResetWriteTimer();
return connectionInformation;
}

public void Dispose()
{
_decoratee.Dispose();
_readCts.Dispose();

// Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running.
_keepAliveTimer?.Dispose();
_writerTimer.Dispose();
}

public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
Expand Down Expand Up @@ -72,50 +80,30 @@ async ValueTask PerformWriteAsync()
{
await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);

// After each successful write, we schedule one ping (keep alive or heartbeat) at _writeIdleTimeout / 2 in
// the future. Since each ping is itself a write, if there is no application activity at all, we'll send
// successive pings at _writeIdleTimeout / 2 intervals.
ScheduleKeepAlive();
// After each successful write, we (re)schedule one ping (heartbeat) at _writeIdleTimeout / 2 in the future.
// Since each ping is itself a write, if there is no application activity at all, we'll send successive
// pings at _writeIdleTimeout / 2 intervals.
ResetWriteTimer();
}
}

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable" />.
/// </summary>
internal IdleTimeoutDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee;

/// <summary>Constructs a decorator that ensures a call to <see cref="ReadAsync" /> will fail after readIdleTimeout.
/// This decorator also schedules a keepAliveAction after each write (see <see cref="ScheduleKeepAlive" />).
/// </summary>
/// <remarks>Do not call <see cref="Enable" /> on a decorator constructed with this constructor.</remarks>
internal IdleTimeoutDuplexConnectionDecorator(
/// This decorator also schedules a keepAliveAction after each write (see <see cref="ResetWriteTimer" />).</summary>
internal IceDuplexConnectionDecorator(
IDuplexConnection decoratee,
TimeSpan readIdleTimeout,
TimeSpan writeIdleTimeout,
Action keepAliveAction)
: this(decoratee)
{
Debug.Assert(writeIdleTimeout != Timeout.InfiniteTimeSpan);
_decoratee = decoratee;
_readIdleTimeout = readIdleTimeout; // can be infinite i.e. disabled
_writeIdleTimeout = writeIdleTimeout;
_keepAliveTimer = new Timer(_ => keepAliveAction());
}
_writerTimer = new Timer(_ => keepAliveAction());

/// <summary>Enables the read and write idle timeouts; also schedules one keep-alive.</summary>.
internal void Enable(TimeSpan idleTimeout, Action? keepAliveAction)
{
Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan);
Debug.Assert(_keepAliveTimer is null);

_readIdleTimeout = idleTimeout;
_writeIdleTimeout = idleTimeout;

if (keepAliveAction is not null)
{
_keepAliveTimer = new Timer(_ => keepAliveAction());
ScheduleKeepAlive();
}
// We can't schedule a keep alive right away because the connection is not connected yet.
}

/// <summary>Schedules one keep alive in writeIdleTimeout / 2.</summary>
internal void ScheduleKeepAlive() => _keepAliveTimer?.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan);
/// <summary>Resets the write timer. We send a keep alive when this timer expires.</summary>
private void ResetWriteTimer() => _writerTimer.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan);
}
10 changes: 1 addition & 9 deletions src/IceRpc/Internal/IceProtocolConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection
// A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
private bool _refuseInvocations;

// When not null, schedules one keep-alive action in options.IdleTimeout / 2.
private readonly Action? _scheduleKeepAlive;

// Does ShutdownAsync send a close connection frame?
private bool _sendCloseConnectionFrame = true;

Expand Down Expand Up @@ -143,9 +140,6 @@ internal sealed class IceProtocolConnection : IProtocolConnection
throw new InvalidDataException(
$"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{validateConnectionFrame.FrameType}'.");
}

// Schedules a keep-alive to keep the connection alive now that it's established.
_scheduleKeepAlive?.Invoke();
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -600,13 +594,11 @@ internal IceProtocolConnection(

if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
{
var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(
duplexConnection = new IceDuplexConnectionDecorator(
duplexConnection,
readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
writeIdleTimeout: options.IceIdleTimeout,
KeepAlive);
duplexConnection = duplexConnectionDecorator;
_scheduleKeepAlive = duplexConnectionDecorator.ScheduleKeepAlive;
}

_duplexConnection = duplexConnection;
Expand Down
79 changes: 44 additions & 35 deletions src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ internal class SlicConnection : IMultiplexedConnection
private Task<TransportConnectionInformation>? _connectTask;
private readonly CancellationTokenSource _disposedCts = new();
private Task? _disposeTask;
private readonly IDuplexConnection _duplexConnection;
private readonly SlicDuplexConnectionDecorator _duplexConnection;
private readonly DuplexConnectionReader _duplexConnectionReader;
private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
private readonly Action<TimeSpan, Action?> _enableIdleTimeoutAndKeepAlive;
private bool _isClosed;
private ulong? _lastRemoteBidirectionalStreamId;
private ulong? _lastRemoteUnidirectionalStreamId;
Expand Down Expand Up @@ -265,9 +264,7 @@ async Task<TransportConnectionInformation> PerformConnectAsync()

if (idleTimeout != Timeout.InfiniteTimeSpan)
{
// Only client connections send ping frames when idle to keep the server connection alive. The server
// sends back a Pong frame in turn to keep alive the client connection.
_enableIdleTimeoutAndKeepAlive(idleTimeout, IsServer ? null : KeepAlive);
_duplexConnection.Enable(idleTimeout);
}

_readFramesTask = ReadFramesAsync(_disposedCts.Token);
Expand Down Expand Up @@ -317,31 +314,6 @@ async Task<TransportConnectionInformation> PerformConnectAsync()
_ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
};

void KeepAlive()
{
// _pendingPongCount can be < 0 if an unexpected pong is received. If it's the case, the connection is being
// torn down and there's no point in sending a ping frame.
if (Interlocked.Increment(ref _pendingPongCount) > 0)
{
try
{
// For now, the Ping frame payload is just a long which is always set to 0. In the future, it could
// be a ping frame type value if the ping frame is used for different purpose (e.g: a KeepAlive or
// RTT ping frame type).
WriteConnectionFrame(FrameType.Ping, new PingBody(0L).Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The Slic keep alive timer failed with an unexpected exception: {exception}");
throw;
}
}
}

async ValueTask<T> ReadFrameAsync<T>(
Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -577,10 +549,11 @@ internal SlicConnection(

_closedCancellationToken = _closedCts.Token;

var duplexConnectionDecorator = new IdleTimeoutDuplexConnectionDecorator(duplexConnection);
_enableIdleTimeoutAndKeepAlive = duplexConnectionDecorator.Enable;
// Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
_duplexConnection = IsServer ?
new SlicDuplexConnectionDecorator(duplexConnection) :
new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);

_duplexConnection = duplexConnectionDecorator;
_duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
_duplexConnectionWriter = new SlicDuplexConnectionWriter(
_duplexConnection,
Expand All @@ -598,6 +571,42 @@ internal SlicConnection(
_nextBidirectionalId = 0;
_nextUnidirectionalId = 2;
}

void SendPing(long payload)
{
try
{
WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
}
catch (IceRpcException)
{
// Expected if the connection is closed.
}
catch (Exception exception)
{
Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
throw;
}
}

void SendReadPing()
{
// This local function is no-op if there is already a pending Pong.
if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
{
SendPing(1L);
}
}

void SendWritePing()
{
// _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
// being torn down and there's no point in sending a ping frame.
if (Interlocked.Increment(ref _pendingPongCount) > 0)
{
SendPing(0L);
}
}
}

/// <summary>Fills the given writer with stream data received on the connection.</summary>
Expand Down Expand Up @@ -1190,8 +1199,8 @@ async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
(ref SliceDecoder decoder) => new PongBody(ref decoder),
cancellationToken).ConfigureAwait(false);

// For now, we only send a 0 payload value.
if (pongBody.Payload != 0L)
// For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
if (pongBody.Payload != 0L && pongBody.Payload != 1L)
{
throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
}
Expand Down
127 changes: 127 additions & 0 deletions src/IceRpc/Transports/Slic/Internal/SlicDuplexConnectionDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) ZeroC, Inc.

using System.Buffers;
using System.Diagnostics;

namespace IceRpc.Transports.Slic.Internal;

/// <summary>Decorates <see cref="ReadAsync" /> to fail if no byte is received for over idle timeout. Also optionally
/// decorates both <see cref="ReadAsync"/> and <see cref="WriteAsync" /> to schedule pings that prevent both the local
/// and remote idle timers from expiring.</summary>
internal class SlicDuplexConnectionDecorator : IDuplexConnection
{
private readonly IDuplexConnection _decoratee;
private TimeSpan _idleTimeout = Timeout.InfiniteTimeSpan;
private readonly CancellationTokenSource _readCts = new();

private readonly Timer? _readTimer;
private readonly Timer? _writeTimer;

public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) =>
_decoratee.ConnectAsync(cancellationToken);

public void Dispose()
{
_decoratee.Dispose();
_readCts.Dispose();

// Using Dispose is fine, there's no need to wait for the keep alive action to terminate if it's running.
_readTimer?.Dispose();
_writeTimer?.Dispose();
}

public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
return _idleTimeout == Timeout.InfiniteTimeSpan ?
_decoratee.ReadAsync(buffer, cancellationToken) :
PerformReadAsync();

async ValueTask<int> PerformReadAsync()
{
try
{
using CancellationTokenRegistration _ = cancellationToken.UnsafeRegister(
cts => ((CancellationTokenSource)cts!).Cancel(),
_readCts);
_readCts.CancelAfter(_idleTimeout); // enable idle timeout before reading

int bytesRead = await _decoratee.ReadAsync(buffer, _readCts.Token).ConfigureAwait(false);

// After each successful read, we schedule one ping some time in the future.
if (bytesRead > 0)
{
ResetReadTimer();
}
// When 0, the other side called ShutdownWriteAsync, so there is no point to send a ping since we can't
// get back a pong.

return bytesRead;
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();

throw new IceRpcException(
IceRpcError.ConnectionIdle,
$"The connection did not receive any bytes for over {_idleTimeout.TotalSeconds} s.");
}
finally
{
_readCts.CancelAfter(Timeout.InfiniteTimeSpan); // disable idle timeout if not canceled
}
}
}

public Task ShutdownWriteAsync(CancellationToken cancellationToken) =>
_decoratee.ShutdownWriteAsync(cancellationToken);

public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken)
{
return _idleTimeout == Timeout.InfiniteTimeSpan ?
_decoratee.WriteAsync(buffer, cancellationToken) :
PerformWriteAsync();

async ValueTask PerformWriteAsync()
{
await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);

// After each successful write, we schedule one ping some time in the future. Since each ping is itself a
// write, if there is no application activity at all, we'll send successive pings at regular intervals.
ResetWriteTimer();
}
}

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable"/>.
/// </summary>
internal SlicDuplexConnectionDecorator(IDuplexConnection decoratee) => _decoratee = decoratee;

/// <summary>Constructs a decorator that does nothing until it is enabled by a call to <see cref="Enable"/>.
/// </summary>
internal SlicDuplexConnectionDecorator(IDuplexConnection decoratee, Action sendReadPing, Action sendWritePing)
: this(decoratee)
{
_readTimer = new Timer(_ => sendReadPing());
_writeTimer = new Timer(_ => sendWritePing());
}

/// <summary>Sets the idle timeout and schedules pings once the connection is established.</summary>.
internal void Enable(TimeSpan idleTimeout)
{
Debug.Assert(idleTimeout != Timeout.InfiniteTimeSpan);
_idleTimeout = idleTimeout;

ResetReadTimer();
ResetWriteTimer();
}

/// <summary>Resets the read timer. We send a "read" ping when this timer expires.</summary>
/// <remarks>This method is no-op unless this decorator is constructed with send ping actions.</remarks>
private void ResetReadTimer() => _readTimer?.Change(_idleTimeout * 0.5, Timeout.InfiniteTimeSpan);

/// <summary>Resets the write timer. We send a "write" ping when this timer expires.</summary>
/// <remarks>This method is no-op unless this decorator is constructed with send ping actions.</remarks>
// The write timer factor (0.6) was chosen to be greater than the read timer factor (0.5). This way, when the
// connection is completely idle, the read timer expires before the write timer and has time to send a ping that
// resets the write timer. This reduces the likelihood of duplicate "keep alive" pings.
private void ResetWriteTimer() => _writeTimer?.Change(_idleTimeout * 0.6, Timeout.InfiniteTimeSpan);
}
Loading

0 comments on commit 2dd4008

Please sign in to comment.