Skip to content

Commit

Permalink
Fix #2392: Dequeue all timed out messages from the backlog when not c…
Browse files Browse the repository at this point in the history
…onnected, even when no completion is needed, to be able to dequeue and complete other timed out messages. (#2397)

When the client is not connected timed out fire and forget messages currently are not removed from the backlog that also results in subsequent timed out messages not being marked as timed out, as described in #2392.
  • Loading branch information
kornelpal authored Mar 29, 2023
1 parent 1364ef8 commit ef388bd
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Current package versions:

## Unreleased

- Fix [#2392](https://github.com/StackExchange/StackExchange.Redis/issues/2392): Dequeue *all* timed out messages from the backlog when not connected (including Fire+Forget) ([#2397 by kornelpal](https://github.com/StackExchange/StackExchange.Redis/pull/2397))
- Fix [#2400](https://github.com/StackExchange/StackExchange.Redis/issues/2400): Expose `ChannelMessageQueue` as `IAsyncEnumerable<ChannelMessage>` ([#2402 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2402))
- Add: support for `CLIENT SETINFO` (lib name/version) during handshake; opt-out is via `ConfigurationOptions`; also support read of `resp`, `lib-ver` and `lib-name` via `CLIENT LIST` ([#2414 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2414))

Expand Down
19 changes: 12 additions & 7 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -869,25 +869,30 @@ private void CheckBacklogForTimeouts()
while (_backlog.TryPeek(out Message? message))
{
// See if the message has pass our async timeout threshold
// or has otherwise been completed (e.g. a sync wait timed out) which would have cleared the ResultBox
if (!message.HasTimedOut(now, timeout, out var _) || message.ResultBox == null) break; // not a timeout - we can stop looking
// Note: All timed out messages must be dequeued, even when no completion is needed, to be able to dequeue and complete other timed out messages.
if (!message.HasTimedOut(now, timeout, out var _)) break; // not a timeout - we can stop looking
lock (_backlog)
{
// Peek again since we didn't have lock before...
// and rerun the exact same checks as above, note that it may be a different message now
if (!_backlog.TryPeek(out message)) break;
if (!message.HasTimedOut(now, timeout, out var _) && message.ResultBox != null) break;
if (!message.HasTimedOut(now, timeout, out var _)) break;

if (!BacklogTryDequeue(out var message2) || (message != message2)) // consume it for real
{
throw new RedisException("Thread safety bug detected! A queue message disappeared while we had the backlog lock");
}
}

// Tell the message it has failed
// Note: Attempting to *avoid* reentrancy/deadlock issues by not holding the lock while completing messages.
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
// We only handle async timeouts here, synchronous timeouts are handled upstream.
// Those sync timeouts happen in ConnectionMultiplexer.ExecuteSyncImpl() via Monitor.Wait.
if (message.ResultBoxIsAsync)
{
// Tell the message it has failed
// Note: Attempting to *avoid* reentrancy/deadlock issues by not holding the lock while completing messages.
var ex = Multiplexer.GetException(WriteResult.TimeoutBeforeWrite, message, ServerEndPoint);
message.SetExceptionAndComplete(ex, this);
}
}
}

Expand Down
46 changes: 46 additions & 0 deletions tests/StackExchange.Redis.Tests/Issues/Issue2392Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace StackExchange.Redis.Tests.Issues
{
public class Issue2392Tests : TestBase
{
public Issue2392Tests(ITestOutputHelper output) : base(output) { }

[Fact]
public async Task Execute()
{
var options = new ConfigurationOptions()
{
BacklogPolicy = new()
{
QueueWhileDisconnected = true,
AbortPendingOnConnectionFailure = false,
},
AbortOnConnectFail = false,
ConnectTimeout = 1,
ConnectRetry = 0,
AsyncTimeout = 1,
SyncTimeout = 1,
AllowAdmin = true,
};
options.EndPoints.Add("127.0.0.1:1234");

using var conn = await ConnectionMultiplexer.ConnectAsync(options, Writer);
var key = Me();
var db = conn.GetDatabase();
var server = conn.GetServerSnapshot()[0];

// Fail the connection
conn.AllowConnect = false;
server.SimulateConnectionFailure(SimulatedFailureType.All);
Assert.False(conn.IsConnected);

await db.StringGetAsync(key, flags: CommandFlags.FireAndForget);
var ex = await Assert.ThrowsAnyAsync<Exception>(() => db.StringGetAsync(key).WithTimeout(5000));
Assert.True(ex is RedisTimeoutException or RedisConnectionException);
}
}
}

0 comments on commit ef388bd

Please sign in to comment.