From 9035679bd7dc932907e9bb7318fee41a53c84f84 Mon Sep 17 00:00:00 2001 From: Kornel Pal Date: Wed, 5 Apr 2023 05:38:04 +0200 Subject: [PATCH] Fix concurrency issue between AbandonPendingBacklog() and CheckBacklogForTimeouts(), and remove backlog locking. --- src/StackExchange.Redis/PhysicalBridge.cs | 60 ++++++++++++----------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index e7af56a69..29e8299b9 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -41,6 +41,7 @@ internal sealed class PhysicalBridge : IDisposable private int _backlogProcessorIsRunning = 0; private int _backlogCurrentEnqueued = 0; private long _backlogTotalEnqueued = 0; + private Exception? _abandonPendingBacklogException; private int activeWriters = 0; private int beating; @@ -483,11 +484,18 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti private void AbandonPendingBacklog(Exception ex) { + // Peeking at the backlog, checking message and then dequeuing is not thread-safe. + // CheckBacklogForTimeouts() depends on this being set to properly complete dequeued messages. + Volatile.Write(ref _abandonPendingBacklogException, ex); + while (BacklogTryDequeue(out Message? next)) { Multiplexer.OnMessageFaulted(next, ex); next.SetExceptionAndComplete(ex, this); } + + // Best effort cleanup to avoid false positive thread safey check failures in CheckBacklogForTimeouts(). + if (_backlogStatus != BacklogStatus.CheckingForTimeout) Interlocked.CompareExchange(ref _abandonPendingBacklogException, null, ex); } internal void OnFullyEstablished(PhysicalConnection connection, string source) @@ -888,24 +896,29 @@ private void CheckBacklogForTimeouts() var now = Environment.TickCount; var timeout = TimeoutMilliseconds; - // Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use - // a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately. - // But we reduce contention by only locking if we see something that looks timed out. + // Peeking at the backlog, checking message and then dequeuing is not thread-safe. + // Because AbandonPendingBacklog() is the only dequeuer that can run concurrently, + // locking can be avoided by throwing the AbandonPendingBacklog() exception here. while (_backlog.TryPeek(out Message? message)) { // See if the message has pass our async timeout threshold // Note: All timed out messages must be dequeued, even when no completion is needed, to be able to dequeue and complete other timed out messages. if (!message.HasTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking - lock (_backlog) + if (!BacklogTryDequeue(out var message2)) message2 = null; // consume it for real + if (message != message2) { - // Peek again since we didn't have lock before... - // and rerun the exact same checks as above, note that it may be a different message now - if (!_backlog.TryPeek(out message)) break; - if (!message.HasTimedOut(now, timeout, out var _)) break; + var ex = Volatile.Read(ref _abandonPendingBacklogException); + var isAbandonPendingBacklog = ex != null; + ex ??= new RedisException("Thread safety bug detected! A queue message disappeared when AbandonPendingBacklog() was not running."); + message2?.SetExceptionAndComplete(ex, this); - if (!BacklogTryDequeue(out var message2) || (message != message2)) // consume it for real + if (isAbandonPendingBacklog) { - throw new RedisException("Thread safety bug detected! A queue message disappeared while we had the backlog lock"); + break; + } + else + { + throw ex; } } @@ -976,20 +989,15 @@ private async Task ProcessBacklogAsync() if (isDisposed && BacklogHasItems) { _backlogStatus = BacklogStatus.NotifyingDisposed; - // Because peeking at the backlog, checking message and then dequeuing, is not thread-safe, we do have to use - // a lock here, for mutual exclusion of backlog DEQUEUERS. Unfortunately. - // But we reduce contention by only locking if we see something that looks timed out. + // Peeking at the backlog, checking message and then dequeuing is not thread-safe. + // CheckBacklogForTimeouts() depends on not running concurrently with this. while (BacklogHasItems) { - Message? message = null; - lock (_backlog) + if (!BacklogTryDequeue(out Message? message)) { - if (!BacklogTryDequeue(out message)) - { - break; - } + break; } - + var ex = ExceptionFactory.Timeout(Multiplexer, "The message was in the backlog when connection was disposed", message, ServerEndPoint, WriteResult.TimeoutBeforeWrite, this); message.SetExceptionAndComplete(ex, this); } @@ -1073,17 +1081,13 @@ private async Task ProcessBridgeBacklogAsync() // If we can't write them, abort and wait for the next heartbeat or activation to try this again. while (IsConnected && physical?.HasOutputPipe == true) { - Message? message; _backlogStatus = BacklogStatus.CheckingForWork; - lock (_backlog) + // Note that we're actively taking it off the queue here, not peeking + // If there's nothing left in queue, we're done. + if (!BacklogTryDequeue(out Message? message)) { - // Note that we're actively taking it off the queue here, not peeking - // If there's nothing left in queue, we're done. - if (!BacklogTryDequeue(out message)) - { - break; - } + break; } try