From 9ab79e30027b9a4b06c2cb34c19ac7d1b45d581d Mon Sep 17 00:00:00 2001 From: Marc Gravell Date: Fri, 1 Mar 2024 17:40:58 +0000 Subject: [PATCH] new tunnel-based RESP logging and validation (#2660) * Provide new LoggingTunnel API; this * words * fix PR number * fix file location * save the sln * identify smessage as out-of-band * add .ForAwait() throughout LoggingTunnel * clarify meaning of path parameter --- StackExchange.Redis.sln | 1 + docs/ReleaseNotes.md | 4 +- docs/RespLogging.md | 150 +++++ docs/index.md | 1 + src/StackExchange.Redis/BufferReader.cs | 2 + .../Configuration/LoggingTunnel.cs | 627 ++++++++++++++++++ src/StackExchange.Redis/PhysicalConnection.cs | 4 +- src/StackExchange.Redis/RedisResult.cs | 2 +- 8 files changed, 787 insertions(+), 4 deletions(-) create mode 100644 docs/RespLogging.md create mode 100644 src/StackExchange.Redis/Configuration/LoggingTunnel.cs diff --git a/StackExchange.Redis.sln b/StackExchange.Redis.sln index 1fa39f4c2..f59d5f6fc 100644 --- a/StackExchange.Redis.sln +++ b/StackExchange.Redis.sln @@ -120,6 +120,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{153A10E4-E docs\PubSubOrder.md = docs\PubSubOrder.md docs\ReleaseNotes.md = docs\ReleaseNotes.md docs\Resp3.md = docs\Resp3.md + docs\RespLogging.md = docs\RespLogging.md docs\Scripting.md = docs\Scripting.md docs\Server.md = docs\Server.md docs\Testing.md = docs\Testing.md diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 2589d8bd0..18adb415e 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -8,10 +8,12 @@ Current package versions: ## Unreleased +- Add new `LoggingTunnel` API; see https://stackexchange.github.io/StackExchange.Redis/Logging [#2660 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2660) + ## 2.7.27 - Support `HeartbeatConsistencyChecks` and `HeartbeatInterval` in `Clone()` ([#2658 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2658)) -- Add `AddLibraryNameSuffix` to multiplexer; allows usage-specific tokens to be appended *after connect* +- Add `AddLibraryNameSuffix` to multiplexer; allows usage-specific tokens to be appended *after connect* [#2659 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2659) ## 2.7.23 diff --git a/docs/RespLogging.md b/docs/RespLogging.md new file mode 100644 index 000000000..3ff3b0164 --- /dev/null +++ b/docs/RespLogging.md @@ -0,0 +1,150 @@ +Logging and validating the underlying RESP stream +=== + +Sometimes (rarely) there is a question over the validity of the RESP stream from a server (especially when using proxies +or a "redis-like-but-not-actually-redis" server), and it is hard to know whether the *data sent* was bad, vs +the client library tripped over the data. + +To help with this, an experimental API exists to help log and validate RESP streams. This API is not intended +for routine use (and may change at any time), but can be useful for diagnosing problems. + +For example, consider we have the following load test which (on some setup) causes a failure with some +degree of reliability (even if you need to run it 6 times to see a failure): + +``` c# +// connect +Console.WriteLine("Connecting..."); +var options = ConfigurationOptions.Parse(ConnectionString); +await using var muxer = await ConnectionMultiplexer.ConnectAsync(options); +var db = muxer.GetDatabase(); + +// load +RedisKey testKey = "marc_abc"; +await db.KeyDeleteAsync(testKey); +Console.WriteLine("Writing..."); +for (int i = 0; i < 100; i++) +{ + // sync every 50 iterations (pipeline the rest) + var flags = (i % 50) == 0 ? CommandFlags.None : CommandFlags.FireAndForget; + await db.SetAddAsync(testKey, Guid.NewGuid().ToString(), flags); +} + +// fetch +Console.WriteLine("Reading..."); +int count = 0; +for (int i = 0; i < 10; i++) +{ + // this is deliberately not using SCARD + // (to put load on the inbound) + count += (await db.SetMembersAsync(testKey)).Length; +} +Console.WriteLine("all done"); +``` + +## Logging RESP streams + +When this fails, it will not be obvious exactly who is to blame. However, we can ask for the data streams +to be logged to the local file-system. + +**Obviously, this may leave data on disk, so this may present security concerns if used with production data; use +this feature sparingly, and clean up after yourself!** + +``` c# +// connect +Console.WriteLine("Connecting..."); +var options = ConfigurationOptions.Parse(ConnectionString); +LoggingTunnel.LogToDirectory(options, @"C:\Code\RedisLog"); // <=== added! +await using var muxer = await ConnectionMultiplexer.ConnectAsync(options); +... +``` + +This API is marked `[Obsolete]` simply to discourage usage, but you can ignore this warning once you +understand what it is saying (using `#pragma warning disable CS0618` if necessary). + +This will update the `ConfigurationOptions` with a custom `Tunnel` that performs file-based mirroring +of the RESP streams. If `Ssl` is enabled on the `ConfigurationOptions`, the `Tunnel` will *take over that responsibility* +(so that the unencrypted data can be logged), and will *disable* `Ssl` on the `ConfigurationOptions` - but TLS +will still be used correctly. + +If we run our code, we will see that 2 files are written per connection ("in" and "out"); if you are using RESP2 (the default), +then 2 connections are usually established (one for regular "interactive" commands, and one for pub/sub messages), so this will +typically create 4 files. + +## Validating RESP streams + +RESP is *mostly* text, so a quick eyeball can be achieved using any text tool; an "out" file will typically start: + +``` txt +$6 +CLIENT +$7 +SETNAME +... +``` + +and an "in" file will typically start: + +``` txt ++OK ++OK ++OK +... +``` + +This is the start of the handshakes for identifying the client to the redis server, and the server acknowledging this (if +you have authentication enabled, there will be a `AUTH` command first, or `HELLO` on RESP3). + +If there is a failure, you obviously don't want to manually check these files. Instead, an API exists to validate RESP streams: + +``` c# +var messages = await LoggingTunnel.ValidateAsync(@"C:\Code\RedisLog"); +Console.WriteLine($"{messages} RESP fragments validated"); +``` + +If the RESP streams are *not* valid, an exception will provide further details. + +**An exception here is strong evidence that there is a fault either in the redis server, or an intermediate proxy**. + +Conversely, if the library reported a protocol failure but the validation step here *does not* report an error, then +that is strong evidence of a library error; [**please report this**](https://github.com/StackExchange/StackExchange.Redis/issues/new) (with details). + +You can also *replay* the conversation locally, seeing the individual requests and responses: + +``` c# +var messages = await LoggingTunnel.ReplayAsync(@"C:\Code\RedisLog", (cmd, resp) => +{ + if (cmd.IsNull) + { + // out-of-band/"push" response + Console.WriteLine("<< " + LoggingTunnel.DefaultFormatResponse(resp)); + } + else + { + Console.WriteLine(" > " + LoggingTunnel.DefaultFormatCommand(cmd)); + Console.WriteLine(" < " + LoggingTunnel.DefaultFormatResponse(resp)); + } +}); +Console.WriteLine($"{messages} RESP commands validated"); +``` + +The `DefaultFormatCommand` and `DefaultFormatResponse` methods are provided for convenience, but you +can perform your own formatting logic if required. If a RESP erorr is encountered in the response to +a particular message, the callback will still be invoked to indicate that error. For example, after deliberately +introducing an error into the captured file, we might see: + +``` txt + > CLUSTER NODES + < -ERR This instance has cluster support disabled + > GET __Booksleeve_TieBreak + < (null) + > ECHO ... + < -Invalid bulk string terminator +Unhandled exception. StackExchange.Redis.RedisConnectionException: Invalid bulk string terminator +``` + +The `-ERR` message is not a problem - that's normal and simply indicates that this is not a redis cluster; however, the +final pair is an `ECHO` request, for which the corresponding response was invalid. This information is useful for finding +out what happened. + +Emphasis: this API is not intended for common/frequent usage; it is intended only to assist validating the underlying +RESP stream. \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index cd1c84d7e..b66f8f9a7 100644 --- a/docs/index.md +++ b/docs/index.md @@ -48,6 +48,7 @@ Documentation - [Testing](Testing) - running the `StackExchange.Redis.Tests` suite to validate changes - [Timeouts](Timeouts) - guidance on dealing with timeout problems - [Thread Theft](ThreadTheft) - guidance on avoiding TPL threading problems +- [RESP Logging](RespLogging) - capturing and validating RESP streams Questions and Contributions --- diff --git a/src/StackExchange.Redis/BufferReader.cs b/src/StackExchange.Redis/BufferReader.cs index a7199fe6b..5691217f9 100644 --- a/src/StackExchange.Redis/BufferReader.cs +++ b/src/StackExchange.Redis/BufferReader.cs @@ -17,6 +17,8 @@ internal ref struct BufferReader public int OffsetThisSpan { get; private set; } public int RemainingThisSpan { get; private set; } + public long TotalConsumed => _totalConsumed; + private ReadOnlySequence.Enumerator _iterator; private ReadOnlySpan _current; diff --git a/src/StackExchange.Redis/Configuration/LoggingTunnel.cs b/src/StackExchange.Redis/Configuration/LoggingTunnel.cs new file mode 100644 index 000000000..a058a522c --- /dev/null +++ b/src/StackExchange.Redis/Configuration/LoggingTunnel.cs @@ -0,0 +1,627 @@ +using Pipelines.Sockets.Unofficial; +using Pipelines.Sockets.Unofficial.Arenas; +using System; +using System.Buffers; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.IO.Pipelines; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using static StackExchange.Redis.PhysicalConnection; + +namespace StackExchange.Redis.Configuration; + +/// +/// Captures redis traffic; intended for debug use +/// +[Obsolete("This API is experimental, has security and performance implications, and may change without notice", false)] +[SuppressMessage("ApiDesign", "RS0016:Add public types and members to the declared API", Justification = "Experimental API")] +public abstract class LoggingTunnel : Tunnel +{ + private readonly ConfigurationOptions _options; + private readonly bool _ssl; + private readonly Tunnel? _tail; + + /// + /// Replay the RESP messages for a pair of streams, invoking a callback per operation + /// + public static async Task ReplayAsync(Stream @out, Stream @in, Action pair) + { + using Arena arena = new(); + var outPipe = StreamConnection.GetReader(@out); + var inPipe = StreamConnection.GetReader(@in); + + long count = 0; + while (true) + { + var sent = await ReadOneAsync(outPipe, arena, isInbound: false).ForAwait(); + ContextualRedisResult received; + try + { + do + { + received = await ReadOneAsync(inPipe, arena, isInbound: true).ForAwait(); + if (received.IsOutOfBand && received.Result is not null) + { + // spoof an empty request for OOB messages + pair(RedisResult.NullSingle, received.Result); + } + } while (received.IsOutOfBand); + } + catch (Exception ex) + { + // if we got an exception following a command, spoof that as a pair, + // so we see the message that had a corrupted reply + if (sent.Result is not null) + { + pair(sent.Result, RedisResult.Create(ex.Message, ResultType.Error)); + } + throw; // still surface the original exception + } + + if (sent.Result is null || received.Result is null) break; // no more paired messages + + pair(sent.Result, received.Result); + count++; + } + return count; + } + + + /// + /// Replay the RESP messages all the streams in a folder, invoking a callback per operation + /// + /// The directory of captured files to replay. + /// Operation to perform per replayed message pair. + public static async Task ReplayAsync(string path, Action pair) + { + long total = 0; + foreach (var outPath in Directory.EnumerateFiles(path, "*.out")) + { + var inPath = Path.ChangeExtension(outPath, "in"); + if (!File.Exists(outPath)) continue; + + using var outFile = File.OpenRead(outPath); + using var inFile = File.OpenRead(inPath); + total += await ReplayAsync(outFile, inFile, pair).ForAwait(); + } + return total; + } + + private static async ValueTask ReadOneAsync(PipeReader input, Arena arena, bool isInbound) + { + while (true) + { + var readResult = await input.ReadAsync().ForAwait(); + var buffer = readResult.Buffer; + int handled = 0; + var result = buffer.IsEmpty ? default : ProcessBuffer(arena, ref buffer, isInbound); + input.AdvanceTo(buffer.Start, buffer.End); + + if (result.Result is not null) return result; + + if (handled == 0 && readResult.IsCompleted) + { + break; // no more data, or trailing incomplete messages + } + } + return default; + } + + /// + /// Validate a RESP stream and return the number of top-level RESP fragments. + /// + /// The path of a single file to validate, or a directory of captured files to validate. + public static async Task ValidateAsync(string path) + { + if (File.Exists(path)) + { + using var singleFile = File.OpenRead(path); + return await ValidateAsync(singleFile).ForAwait(); + } + else if (Directory.Exists(path)) + { + long total = 0; + foreach (var file in Directory.EnumerateFiles(path)) + { + try + { + using var folderFile = File.OpenRead(file); + total += await ValidateAsync(folderFile).ForAwait(); + } + catch (Exception ex) + { + throw new InvalidOperationException(ex.Message + " in " + file, ex); + } + } + return total; + } + else + { + throw new FileNotFoundException(path); + } + } + + /// + /// Validate a RESP stream and return the number of top-level RESP fragments. + /// + public static async Task ValidateAsync(Stream stream) + { + using var arena = new Arena(); + var input = StreamConnection.GetReader(stream); + long total = 0, position = 0; + while (true) + { + var readResult = await input.ReadAsync().ForAwait(); + var buffer = readResult.Buffer; + int handled = 0; + if (!buffer.IsEmpty) + { + try + { + ProcessBuffer(arena, ref buffer, ref position, ref handled); // updates buffer.Start + } + catch (Exception ex) + { + throw new InvalidOperationException($"Invalid fragment starting at {position} (fragment {total + handled})", ex); + } + total += handled; + } + + input.AdvanceTo(buffer.Start, buffer.End); + + if (handled == 0 && readResult.IsCompleted) + { + break; // no more data, or trailing incomplete messages + } + } + return total; + } + private static void ProcessBuffer(Arena arena, ref ReadOnlySequence buffer, ref long position, ref int messageCount) + { + while (!buffer.IsEmpty) + { + var reader = new BufferReader(buffer); + try + { + var result = TryParseResult(true, arena, in buffer, ref reader, true, null); + if (result.HasValue) + { + buffer = reader.SliceFromCurrent(); + position += reader.TotalConsumed; + messageCount++; + } + else + { + break; // remaining buffer isn't enough; give up + } + } + finally + { + arena.Reset(); + } + } + } + + private readonly struct ContextualRedisResult + { + public readonly RedisResult? Result; + public readonly bool IsOutOfBand; + public ContextualRedisResult(RedisResult? result, bool isOutOfBand) + { + Result = result; + IsOutOfBand = isOutOfBand; + } + } + + private static ContextualRedisResult ProcessBuffer(Arena arena, ref ReadOnlySequence buffer, bool isInbound) + { + if (!buffer.IsEmpty) + { + var reader = new BufferReader(buffer); + try + { + var result = TryParseResult(true, arena, in buffer, ref reader, true, null); + bool isOutOfBand = result.Resp3Type == ResultType.Push + || (isInbound && result.Resp2TypeArray == ResultType.Array && IsArrayOutOfBand(result)); + if (result.HasValue) + { + buffer = reader.SliceFromCurrent(); + if (!RedisResult.TryCreate(null, result, out var parsed)) + { + throw new InvalidOperationException("Unable to parse raw result to RedisResult"); + } + return new(parsed, isOutOfBand); + } + } + finally + { + arena.Reset(); + } + } + return default; + + static bool IsArrayOutOfBand(in RawResult result) + { + var items = result.GetItems(); + return (items.Length >= 3 && items[0].IsEqual(message) || items[0].IsEqual(smessage)) + || (items.Length >= 4 && items[0].IsEqual(pmessage)); + + } + } + private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage"; + + /// + /// Create a new instance of a + /// + protected LoggingTunnel(ConfigurationOptions? options = null, Tunnel? tail = null) + { + options ??= new(); + _options = options; + _ssl = options.Ssl; + _tail = tail; + options.Ssl = false; // disable here, since we want to log *decrypted* + } + + /// + /// Configures the provided options to perform file-based logging to a directory; + /// files will be sequential per stream starting from zero, and will blindly overwrite existing files. + /// + public static void LogToDirectory(ConfigurationOptions options, string path) + { + var tunnel = new DirectoryLoggingTunnel(path, options, options.Tunnel); + options.Tunnel = tunnel; + } + + private class DirectoryLoggingTunnel : LoggingTunnel + { + private readonly string path; + private int _nextIndex = -1; + + internal DirectoryLoggingTunnel(string path, ConfigurationOptions? options = null, Tunnel? tail = null) + : base(options, tail) + { + this.path = path; + if (!Directory.Exists(path)) throw new InvalidOperationException("Directly does not exist: " + path); + } + + protected override Stream Log(Stream stream, EndPoint endpoint, ConnectionType connectionType) + { + int index = Interlocked.Increment(ref _nextIndex); + var name = $"{Format.ToString(endpoint)} {connectionType} {index}.tmp"; + foreach (var c in InvalidChars) + { + name = name.Replace(c, ' '); + } + name = Path.Combine(path, name); + var reads = File.Create(Path.ChangeExtension(name, ".in")); + var writes = File.Create(Path.ChangeExtension(name, ".out")); + return new LoggingDuplexStream(stream, reads, writes); + } + + private static readonly char[] InvalidChars = Path.GetInvalidFileNameChars(); + } + + /// + public override async ValueTask BeforeAuthenticateAsync(EndPoint endpoint, ConnectionType connectionType, Socket? socket, CancellationToken cancellationToken) + { + Stream? stream = null; + if (_tail is not null) + { + stream = await _tail.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken).ForAwait(); + } + stream ??= new NetworkStream(socket ?? throw new InvalidOperationException("No stream or socket available")); + if (_ssl) + { + stream = await TlsHandshakeAsync(stream, endpoint).ForAwait(); + } + return Log(stream, endpoint, connectionType); + } + + /// + /// Perform logging on the provided stream + /// + protected abstract Stream Log(Stream stream, EndPoint endpoint, ConnectionType connectionType); + + /// + public override ValueTask BeforeSocketConnectAsync(EndPoint endPoint, ConnectionType connectionType, Socket? socket, CancellationToken cancellationToken) + { + return _tail is null ? base.BeforeSocketConnectAsync(endPoint, connectionType, socket, cancellationToken) + : _tail.BeforeSocketConnectAsync(endPoint, connectionType, socket, cancellationToken); + } + + /// + public override ValueTask GetSocketConnectEndpointAsync(EndPoint endpoint, CancellationToken cancellationToken) + { + return _tail is null ? base.GetSocketConnectEndpointAsync(endpoint, cancellationToken) + : _tail.GetSocketConnectEndpointAsync(endpoint, cancellationToken); + } + +#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously - netfx back-compat mode + private async Task TlsHandshakeAsync(Stream stream, EndPoint endpoint) +#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously + { + // mirrors TLS handshake from PhysicalConnection, but wouldn't help to share code here + var host = _options.SslHost; + if (host.IsNullOrWhiteSpace()) + { + host = Format.ToStringHostOnly(endpoint); + } + + var ssl = new SslStream(stream, false, + _options.CertificateValidationCallback ?? PhysicalConnection.GetAmbientIssuerCertificateCallback(), + _options.CertificateSelectionCallback ?? PhysicalConnection.GetAmbientClientCertificateCallback(), + EncryptionPolicy.RequireEncryption); + +#if NETCOREAPP3_1_OR_GREATER + var configOptions = _options.SslClientAuthenticationOptions?.Invoke(host); + if (configOptions is not null) + { + await ssl.AuthenticateAsClientAsync(configOptions).ForAwait(); + } + else + { + ssl.AuthenticateAsClient(host, _options.SslProtocols, _options.CheckCertificateRevocation); + } +#else + ssl.AuthenticateAsClient(host, _options.SslProtocols, _options.CheckCertificateRevocation); +#endif + return ssl; + } + + /// + /// Get a typical text representation of a redis command + /// + public static string DefaultFormatCommand(RedisResult value) + { + try + { + if (value.IsNull) return "(null)"; + if (value.Type == ResultType.Array) + { + var sb = new StringBuilder(); + for (int i = 0; i < value.Length; i++) + { + var item = value[i]; + if (i != 0) sb.Append(' '); + if (IsSimple(item)) + { + sb.Append(item.AsString()); + } + else + { + sb.Append("..."); + break; + } + } + return sb.ToString(); + } + } + catch {} + return value.Type.ToString(); + + static bool IsSimple(RedisResult value) + { + try + { + switch (value.Resp2Type) + { + case ResultType.Array: return false; + case ResultType.Error: return true; + default: + var blob = value.AsByteArray(); // note non-alloc in the remaining cases + if (blob is null) return true; + if (blob.Length >= 50) return false; + for (int i = 0; i < blob.Length; i++) + { + char c = (char)blob[i]; + if (c < ' ' || c > '~') return false; + } + return true; + } + } + catch + { + return false; + } + } + } + + /// + /// Get a typical text representation of a redis response + /// + public static string DefaultFormatResponse(RedisResult value) + { + try + { + if (value.IsNull) return "(null)"; + switch (value.Type.ToResp2()) + { + case ResultType.Integer: + case ResultType.BulkString: + case ResultType.SimpleString: + return value.AsString()!; + case ResultType.Error: + return "-" + value.ToString(); + case ResultType.Array: + return $"[{value.Length}]"; + } + } + catch (Exception ex) + { + Debug.Write(ex.Message); + } + return value.Type.ToString(); + } + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + protected sealed class LoggingDuplexStream : Stream + { + private readonly Stream _inner, _reads, _writes; + + internal LoggingDuplexStream(Stream inner, Stream reads, Stream writes) + { + _inner = inner; + _reads = reads; + _writes = writes; + } + + public override bool CanRead => _inner.CanRead; + public override bool CanWrite => _inner.CanWrite; + + public override bool CanSeek => false; // duplex + public override bool CanTimeout => _inner.CanTimeout; + public override int ReadTimeout { get => _inner.ReadTimeout; set => _inner.ReadTimeout = value; } + public override int WriteTimeout { get => _inner.WriteTimeout; set => _inner.WriteTimeout = value; } + public override long Length => throw new NotSupportedException(); // duplex + public override long Position + { + get => throw new NotSupportedException(); // duplex + set => throw new NotSupportedException(); // duplex + } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); // duplex + public override void SetLength(long value) => throw new NotSupportedException(); // duplex + + // we don't use these APIs + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => throw new NotSupportedException(); + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => throw new NotSupportedException(); + public override int EndRead(IAsyncResult asyncResult) => throw new NotSupportedException(); + public override void EndWrite(IAsyncResult asyncResult) => throw new NotSupportedException(); + + public override void Flush() + { + // note we don't flush _reads, as that could be cross-threaded + // (flush is a write operation, not a read one) + _writes.Flush(); + _inner.Flush(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await _writes.FlushAsync().ForAwait(); + await _inner.FlushAsync().ForAwait(); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + try { _reads.Flush(); } catch { } + _reads.Dispose(); + try { _writes.Flush(); } catch { } + _writes.Dispose(); + } + base.Dispose(disposing); + } + + public override void Close() + { + _inner.Close(); + try { _reads.Flush(); } catch { } + _reads.Close(); + try { _writes.Flush(); } catch { } + _writes.Close(); + base.Close(); + } + +#if NETCOREAPP3_0_OR_GREATER + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync().ForAwait(); + try { await _reads.FlushAsync().ForAwait(); } catch { } + await _reads.DisposeAsync().ForAwait(); + try { await _writes.FlushAsync().ForAwait(); } catch { } + await _writes.DisposeAsync().ForAwait(); + await base.DisposeAsync().ForAwait(); + } +#endif + + public override int ReadByte() + { + var val = _inner.ReadByte(); + if (val >= 0) + { + _reads.WriteByte((byte)val); + _reads.Flush(); + } + return val; + } + public override int Read(byte[] buffer, int offset, int count) + { + var len = _inner.Read(buffer, offset, count); + if (len > 0) + { + _reads.Write(buffer, offset, len); + _reads.Flush(); + } + return len; + } + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + var len = await _inner.ReadAsync(buffer, offset, count, cancellationToken).ForAwait(); + if (len > 0) + { + await _reads.WriteAsync(buffer, offset, len, cancellationToken).ForAwait(); + await _reads.FlushAsync(cancellationToken).ForAwait(); + } + return len; + } +#if NETCOREAPP3_0_OR_GREATER + public override int Read(Span buffer) + { + var len = _inner.Read(buffer); + if (len > 0) + { + _reads.Write(buffer.Slice(0, len)); + _reads.Flush(); + } + return len; + } + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) + { + var len = await _inner.ReadAsync(buffer, cancellationToken).ForAwait(); + if (len > 0) + { + await _reads.WriteAsync(buffer.Slice(0, len), cancellationToken).ForAwait(); + await _reads.FlushAsync(cancellationToken).ForAwait(); + } + return len; + } +#endif + + public override void WriteByte(byte value) + { + _writes.WriteByte(value); + _inner.WriteByte(value); + } + public override void Write(byte[] buffer, int offset, int count) + { + _writes.Write(buffer, offset, count); + _inner.Write(buffer, offset, count); + } + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _writes.WriteAsync(buffer, offset, count, cancellationToken).ForAwait(); + await _inner.WriteAsync(buffer, offset, count, cancellationToken).ForAwait(); + } +#if NETCOREAPP3_0_OR_GREATER + public override void Write(ReadOnlySpan buffer) + { + _writes.Write(buffer); + _inner.Write(buffer); + } + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + await _writes.WriteAsync(buffer, cancellationToken).ForAwait(); + await _inner.WriteAsync(buffer, cancellationToken).ForAwait(); + } +#endif + } +#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member +} diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs index 9c467dcd3..d13b6af5b 100644 --- a/src/StackExchange.Redis/PhysicalConnection.cs +++ b/src/StackExchange.Redis/PhysicalConnection.cs @@ -1463,7 +1463,7 @@ public ConnectionStatus GetStatus() }; } - private static RemoteCertificateValidationCallback? GetAmbientIssuerCertificateCallback() + internal static RemoteCertificateValidationCallback? GetAmbientIssuerCertificateCallback() { try { @@ -1476,7 +1476,7 @@ public ConnectionStatus GetStatus() } return null; } - private static LocalCertificateSelectionCallback? GetAmbientClientCertificateCallback() + internal static LocalCertificateSelectionCallback? GetAmbientClientCertificateCallback() { try { diff --git a/src/StackExchange.Redis/RedisResult.cs b/src/StackExchange.Redis/RedisResult.cs index b39a646e0..bf094f8af 100644 --- a/src/StackExchange.Redis/RedisResult.cs +++ b/src/StackExchange.Redis/RedisResult.cs @@ -105,7 +105,7 @@ public static RedisResult Create(RedisResult[] values, ResultType resultType) /// Internally, this is very similar to RawResult, except it is designed to be usable, /// outside of the IO-processing pipeline: the buffers are standalone, etc. /// - internal static bool TryCreate(PhysicalConnection connection, in RawResult result, [NotNullWhen(true)] out RedisResult? redisResult) + internal static bool TryCreate(PhysicalConnection? connection, in RawResult result, [NotNullWhen(true)] out RedisResult? redisResult) { try {