Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed fix for GC rooting + extensive notes on how/why are in TimerToken #2413

Merged
merged 12 commits into from
Mar 29, 2023
133 changes: 119 additions & 14 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public sealed partial class ConnectionMultiplexer : IInternalConnectionMultiplex
internal long syncOps, asyncOps;
private long syncTimeouts, fireAndForgets, asyncTimeouts;
private string? failureMessage, activeConfigCause;
private IDisposable? pulse;
private TimerToken? pulse;

private readonly Hashtable servers = new Hashtable();
private volatile ServerSnapshot _serverSnapshot = ServerSnapshot.Empty;
Expand Down Expand Up @@ -874,7 +874,7 @@ public ServerSnapshotFiltered(ServerEndPoint[] endpoints, int count, Func<Server
}
}

[return: NotNullIfNotNull("endpoint")]
[return: NotNullIfNotNull(nameof(endpoint))]
internal ServerEndPoint? GetServerEndPoint(EndPoint? endpoint, LogProxy? log = null, bool activate = true)
{
if (endpoint == null) return null;
Expand Down Expand Up @@ -909,40 +909,132 @@ public ServerSnapshotFiltered(ServerEndPoint[] endpoints, int count, Func<Server
return server;
}

private sealed class TimerToken
internal void Root() => pulse?.Root(this);

// note that this also acts (conditionally) as the GC root for the multiplexer
// when there are in-flight messages; the timer can then acts as the heartbeat
// to make sure that everything *eventually* completes
private sealed class TimerToken : IDisposable
{
private TimerToken(ConnectionMultiplexer muxer)
{
_ref = new WeakReference(muxer);
_weakRef = new(muxer);
}
private Timer? _timer;
public void SetTimer(Timer timer) => _timer = timer;
private readonly WeakReference _ref;

private readonly WeakReference<ConnectionMultiplexer> _weakRef;

private object StrongRefSyncLock => _weakRef; // private and readonly? it'll do
private ConnectionMultiplexer? _strongRef;
private int _strongRefToken;

private static readonly TimerCallback Heartbeat = state =>
{
WeakReference<string> a = null!;
GC.KeepAlive(a);
var token = (TimerToken)state!;
var muxer = (ConnectionMultiplexer?)(token._ref?.Target);
if (muxer != null)
if (token._weakRef.TryGetTarget(out var muxer))
{
muxer.OnHeartbeat();
}
else
{
// the muxer got disposed from out of us; kill the timer
var tmp = token._timer;
token._timer = null;
if (tmp != null) try { tmp.Dispose(); } catch { }
token.Dispose();
}
};

internal static IDisposable Create(ConnectionMultiplexer connection)
internal static TimerToken Create(ConnectionMultiplexer connection)
{
var token = new TimerToken(connection);
var heartbeatMilliseconds = (int)connection.RawConfig.HeartbeatInterval.TotalMilliseconds;
var timer = new Timer(Heartbeat, token, heartbeatMilliseconds, heartbeatMilliseconds);
token.SetTimer(timer);
return timer;
return token;
}

public void Dispose()
{
var tmp = _timer;
_timer = null;
if (tmp is not null) try { tmp.Dispose(); } catch { }

_strongRef = null; // note that this shouldn't be relevant since we've unrooted the TimerToken
}


// explanation of rooting model:
//
// the timer has a reference to the TimerToken; this *always* has a weak-ref,
// and *may* sometimes have a strong-ref; this is so that if a consumer
// drops a multiplexer, it can be garbage collected, i.e. the heartbeat timer
// doesn't keep the entire thing alive forever; instead, if the heartbeat detects
// the weak-ref has been collected, it can cancel the timer and *itself* go away;
// however: this leaves a problem where there is *in flight work* when the consumer
// drops the multiplexer; in particular, if that happens when disconnected, there
// could be consumer-visible pending TCS items *in the backlog queue*; we don't want
// to leave those incomplete, as that fails the contractual expectations of async/await;
// instead we need to root ourselves. The natural place to do this is by rooting the
// multiplexer, allowing the heartbeat to keep poking things, so that the usual
// message-processing and timeout rules apply. This is why we *sometimes* also keep
// a strong-ref to the same multiplexer.
//
// The TimerToken is rooted by the timer callback; this then roots the multiplexer,
// which keeps our bridges and connections in scope - until we're sure we're done
// with them.
//
// 1) any bridge or connection will trigger rooting by calling Root when
// they change from "empty" to "non-empty" i.e. whenever there
// in-flight items; this always changes the token; this includes both the
// backlog and awaiting-reply queues.
//
// 2) the heartbeat is responsible for unrooting, after processing timeouts
// etc; first it checks whether it is needed (IsRooted), which also gives
// it the current token.
//
// 3) if so, the heartbeat will (outside of the lock) query all sources to
// see if they still have outstanding work; if everyone reports negatively,
// then the heartbeat calls UnRoot passing in the old token; if this still
// matches (i.e. no new work came in while we were looking away), then the
// strong reference is removed; note that "has outstanding work" ignores
// internal-call messages; we are only interested in consumer-facing items
// (but we need to check this *here* rather than when adding, as otherwise
// the definition of "is empty, should root" becomes more complicated, which
// impacts the write path, rather than the heartbeat path.
//
// This means that the multiplexer (via the timer) lasts as long as there are
// outstanding messages; if the consumer has dropped the multiplexer, then
// there will be no new incoming messages, and after timeouts: everything
// should drop.

public void Root(ConnectionMultiplexer multiplexer)
{
lock (StrongRefSyncLock)
{
_strongRef = multiplexer;
_strongRefToken++;
}
}

public bool IsRooted(out int token)
{
lock (StrongRefSyncLock)
{
token = _strongRefToken;
return _strongRef is not null;
}
}

public void UnRoot(int token)
{
lock (StrongRefSyncLock)
{
if (token == _strongRefToken)
{
_strongRef = null;
}
}
}
}

Expand All @@ -956,8 +1048,21 @@ private void OnHeartbeat()
Trace("heartbeat");

var tmp = GetServerSnapshot();
int token = 0;
bool isRooted = pulse?.IsRooted(out token) ?? false, hasPendingCallerFacingItems = false;

for (int i = 0; i < tmp.Length; i++)
{
tmp[i].OnHeartbeat();
if (isRooted && !hasPendingCallerFacingItems)
{
hasPendingCallerFacingItems = tmp[i].HasPendingCallerFacingItems();
}
}
if (isRooted && !hasPendingCallerFacingItems)
{ // release the GC root on the heartbeat *if* the token still matches
pulse?.UnRoot(token);
}
}
catch (Exception ex)
{
Expand Down Expand Up @@ -1935,7 +2040,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
}
}

[return: NotNullIfNotNull("defaultValue")]
[return: NotNullIfNotNull(nameof(defaultValue))]
internal T? ExecuteSyncImpl<T>(Message message, ResultProcessor<T>? processor, ServerEndPoint? server, T? defaultValue = default)
{
if (_isDisposed) throw new ObjectDisposedException(ToString());
Expand Down Expand Up @@ -2049,7 +2154,7 @@ static async Task<T> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, Value

internal Task<T?> ExecuteAsyncImpl<T>(Message? message, ResultProcessor<T>? processor, object? state, ServerEndPoint? server)
{
[return: NotNullIfNotNull("tcs")]
[return: NotNullIfNotNull(nameof(tcs))]
static async Task<T?> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, ValueTask<WriteResult> write, TaskCompletionSource<T?>? tcs, Message message, ServerEndPoint? server)
{
var result = await write.ForAwait();
Expand Down
33 changes: 28 additions & 5 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,15 @@ private WriteResult QueueOrFailMessage(Message message)

// Anything else goes in the bin - we're just not ready for you yet
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
Multiplexer.OnMessageFaulted(message, null);
message.Complete();
return WriteResult.NoConnectionAvailable;
}

private WriteResult FailDueToNoConnection(Message message)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
Multiplexer.OnMessageFaulted(message, null);
NickCraver marked this conversation as resolved.
Show resolved Hide resolved
message.Complete();
return WriteResult.NoConnectionAvailable;
}
Expand Down Expand Up @@ -485,7 +485,7 @@ private void AbandonPendingBacklog(Exception ex)
{
while (BacklogTryDequeue(out Message? next))
{
Multiplexer?.OnMessageFaulted(next, ex);
Multiplexer.OnMessageFaulted(next, ex);
next.SetExceptionAndComplete(ex, this);
}
}
Expand Down Expand Up @@ -674,7 +674,7 @@ private WriteResult WriteMessageInsideLock(PhysicalConnection physical, Message
var existingMessage = Interlocked.CompareExchange(ref _activeMessage, message, null);
if (existingMessage != null)
{
Multiplexer?.OnInfoMessage($"Reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
Multiplexer.OnInfoMessage($"Reentrant call to WriteMessageTakingWriteLock for {message.CommandAndKey}, {existingMessage.CommandAndKey} is still active");
return WriteResult.NoConnectionAvailable;
}

Expand Down Expand Up @@ -819,9 +819,20 @@ private bool TryPushToBacklog(Message message, bool onlyIfExists, bool bypassBac
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void BacklogEnqueue(Message message)
{
bool wasEmpty = _backlog.IsEmpty;
_backlog.Enqueue(message);
message.SetBacklogged();
Interlocked.Increment(ref _backlogTotalEnqueued);

if (wasEmpty)
{
// it is important to do this *after* adding, so that we can't
// get into a thread-race where the heartbeat checks too fast;
// the fact that we're accessing Multiplexer down here means that
// we're rooting it ourselves via the stack, so we don't need
// to worry about it being collected until at least after this
Multiplexer.Root();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -1114,10 +1125,22 @@ private async Task ProcessBridgeBacklogAsync()
}
}

public bool HasPendingCallerFacingItems()
{
if (_backlog.Count != 0)
{
foreach (var item in _backlog) // non-consuming, thread-safe, etc
{
if (!item.IsInternalCall) return true;
}
}
return physical?.HasPendingCallerFacingItems() ?? false;
}

private WriteResult TimedOutBeforeWrite(Message message)
{
message.Cancel();
Multiplexer?.OnMessageFaulted(message, null);
Multiplexer.OnMessageFaulted(message, null);
message.Complete();
return WriteResult.TimeoutBeforeWrite;
}
Expand Down
50 changes: 50 additions & 0 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,30 @@ internal static void IdentifyFailureType(Exception? exception, ref ConnectionFai

internal void EnqueueInsideWriteLock(Message next)
{
bool wasEmpty;
var multiplexer = BridgeCouldBeNull?.Multiplexer;
if (multiplexer is null)
{
// multiplexer already collected? then we're almost certainly doomed;
// we can still process it to avoid making things worse/more complex,
// but: we can't reliably assume this works, so: shout now!
next.Cancel();
next.Complete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A return might be missing here so that the cancelled message is not enqueued.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that; another version had throw. I'm not too concerned about writing it though - although nobody can ever see the reply. But if we were to do anything, I think it would need to be return false; and not write if we didn't enqueue. However, in reality, this should.never happen; the muxer should still be alive simply by the nature of being here. Another way we could fix it would be to pass the muxer upwards when doing this, and GC.KeepAlive before exiting.

Copy link
Collaborator Author

@mgravell mgravell Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to pre-empt the obvious "then why is this code here?": If I can't be absolutely sure we can't get into this scenario, I'd rather be paranoid; without the heartbeat, there are scenarios where messages might not be terminated. Emphasis: we should have a muxer: that's literally what the rest of the PR is meant to guarantee. Just if I'm wrong, I'd rather give people cancellation faults than give them nothing at all.

}
lock (_writtenAwaitingResponse)
{
wasEmpty = _writtenAwaitingResponse.Count == 0;
_writtenAwaitingResponse.Enqueue(next);
}
if (wasEmpty)
{
// it is important to do this *after* adding, so that we can't
// get into a thread-race where the heartbeat checks too fast;
// the fact that we're accessing Multiplexer down here means that
// we're rooting it ourselves via the stack, so we don't need
// to worry about it being collected until at least after this
multiplexer?.Root();
}
}

internal void GetCounters(ConnectionCounters counters)
Expand Down Expand Up @@ -1975,5 +1995,35 @@ private static RawResult ParseInlineProtocol(Arena<RawResult> arena, in RawResul
}
return new RawResult(block, false);
}

internal bool HasPendingCallerFacingItems()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_writtenAwaitingResponse, 0, ref lockTaken);
if (lockTaken)
{
if (_writtenAwaitingResponse.Count != 0)
{
foreach (var item in _writtenAwaitingResponse)
{
if (!item.IsInternalCall) return true;
}
}
return false;
}
else
{
// don't content; *presume* that something is demanded; we
// can check again next heartbeat
return true;
}
}
finally
{
if (lockTaken) Monitor.Exit(_writtenAwaitingResponse);
}
}
}
}
9 changes: 8 additions & 1 deletion src/StackExchange.Redis/ServerEndPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ internal void OnHeartbeat()

var source = TaskResultBox<T?>.Create(out var tcs, null);
message.SetSource(processor, source);
if (bridge == null) bridge = GetBridge(message);
bridge ??= GetBridge(message);

WriteResult result;
if (bridge == null)
Expand Down Expand Up @@ -990,5 +990,12 @@ internal void SimulateConnectionFailure(SimulatedFailureType failureType)
interactive?.SimulateConnectionFailure(failureType);
subscription?.SimulateConnectionFailure(failureType);
}

internal bool HasPendingCallerFacingItems()
{
// check whichever bridges exist
if (interactive is { } tmp && tmp.HasPendingCallerFacingItems()) return true;
mgravell marked this conversation as resolved.
Show resolved Hide resolved
return subscription?.HasPendingCallerFacingItems() ?? false;
}
}
}