From 896becbb2bb42905f8bf3d869012709a08fb374a Mon Sep 17 00:00:00 2001 From: William Denton Date: Tue, 7 Nov 2023 14:25:50 +1300 Subject: [PATCH 1/3] Set limit on serialized message size in Subscriptions.Postgres Postgres has a max limit of 8000 bytes for the paylaod in pg_notify. Exceeding this limit will result in infinite retries. This mitigates the infinite loop by blocking the messages from being queued into the subscription system. Resolves https://github.com/ChilliCream/graphql-platform/issues/6658 --- .../PostgresChannelWriter.cs | 2 +- .../PostgresMessageEnvelope.cs | 24 +++++--- .../PostgresSubscriptionOptions.cs | 5 ++ .../Subscriptions.Postgres/PostgressPubSub.cs | 6 +- .../Properties/PostgresResources.Designer.cs | 6 ++ .../Properties/PostgresResources.resx | 3 + .../PostgresChannelTests.cs | 6 +- .../PostgresChannelWriterTests.cs | 4 +- .../PostgresMessageEnvelopeTests.cs | 60 +++++++------------ 9 files changed, 61 insertions(+), 55 deletions(-) diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs index 718517ac97e..6edb4044e9a 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs @@ -101,7 +101,7 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken command.CommandText = "SELECT pg_notify(@channel, @message);"; command.Parameters.Add(new NpgsqlParameter("channel", _channelName)); - command.Parameters.Add(new NpgsqlParameter("message", message.Format())); + command.Parameters.Add(new NpgsqlParameter("message", message.FormattedPayload)); batch.BatchCommands.Add(command); } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs index fb1fc3c529d..cd86ac3fc75 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs @@ -2,6 +2,8 @@ using System.Buffers.Text; using System.Diagnostics.CodeAnalysis; using System.Text; +using static HotChocolate.Subscriptions.Postgres.PostgresResources; + namespace HotChocolate.Subscriptions.Postgres; @@ -12,20 +14,20 @@ internal readonly struct PostgresMessageEnvelope private const byte separator = (byte)':'; private const byte _messageIdLength = 24; - public PostgresMessageEnvelope(string topic, string payload) + public PostgresMessageEnvelope(string topic, string payload, int maxMessagePayloadSize) { Topic = topic; - Payload = payload; + FormattedPayload = Format(topic, payload, maxMessagePayloadSize); } public string Topic { get; } - public string Payload { get; } + public string FormattedPayload { get; } - public string Format() + private static string Format(string topic, string payload, int maxMessagePayloadSize) { - var topicMaxBytesCount = _utf8.GetMaxByteCount(Topic.Length); - var payloadMaxBytesCount = _utf8.GetMaxByteCount(Payload.Length); + var topicMaxBytesCount = _utf8.GetMaxByteCount(topic.Length); + var payloadMaxBytesCount = _utf8.GetMaxByteCount(payload.Length); // we encode the topic to base64 to ensure that we do not have the separator in the topic var topicMaxLength = Base64.GetMaxEncodedToUtf8Length(topicMaxBytesCount); var maxSize = topicMaxLength + 2 + payloadMaxBytesCount + _messageIdLength; @@ -55,7 +57,7 @@ public string Format() slicedBuffer = slicedBuffer[1..]; // write topic as base64 - var topicLengthUtf8 = _utf8.GetBytes(Topic, slicedBuffer); + var topicLengthUtf8 = _utf8.GetBytes(topic, slicedBuffer); Base64.EncodeToUtf8InPlace(slicedBuffer, topicLengthUtf8, out var topicLengthBase64); slicedBuffer = slicedBuffer[topicLengthBase64..]; @@ -64,7 +66,7 @@ public string Format() slicedBuffer = slicedBuffer[1..]; // write payload - var payloadLengthUtf8 = _utf8.GetBytes(Payload, slicedBuffer); + var payloadLengthUtf8 = _utf8.GetBytes(payload, slicedBuffer); // create string var endOfEncodedString = topicLengthBase64 + 2 + payloadLengthUtf8 + _messageIdLength; @@ -75,6 +77,12 @@ public string Format() ArrayPool.Shared.Return(bufferArray); } + if (endOfEncodedString > maxMessagePayloadSize) + { + var msg = string.Format(PostgresMessageEnvelope_PayloadTooLarge, endOfEncodedString, maxMessagePayloadSize); + throw new ArgumentException(msg, nameof(payload)); + } + return result; } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs index f3aad9aaa5f..56fdfd96036 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs @@ -43,6 +43,11 @@ public sealed class PostgresSubscriptionOptions /// public int MaxSendQueueSize { get; set; } = 2048; + /// + /// The maximum serialized size of a message that can be sent using postgres notify. + /// + public int MaxMessagePayloadSize { get; set; } = 8000; + /// /// The subscription options that are used to configure the subscriptions. /// diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs index 59495ca5d28..5525d3e2fb3 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs @@ -7,6 +7,7 @@ internal sealed class PostgresPubSub : DefaultPubSub private readonly IMessageSerializer _serializer; private readonly PostgresChannel _channel; + private readonly int _maxMessagePayloadSize; private readonly int _topicBufferCapacity; private readonly TopicBufferFullMode _topicBufferFullMode; @@ -20,6 +21,7 @@ public PostgresPubSub( { _serializer = serializer; _channel = channel; + _maxMessagePayloadSize = options.MaxMessagePayloadSize; _topicBufferCapacity = options.SubscriptionOptions.TopicBufferCapacity; _topicBufferFullMode = options.SubscriptionOptions.TopicBufferFullMode; } @@ -32,7 +34,7 @@ protected override async ValueTask OnSendAsync( { var serialized = _serializer.Serialize(message); - var envelope = new PostgresMessageEnvelope(formattedTopic, serialized); + var envelope = new PostgresMessageEnvelope(formattedTopic, serialized, _maxMessagePayloadSize); await _channel.SendAsync(envelope, cancellationToken); } @@ -40,7 +42,7 @@ protected override async ValueTask OnSendAsync( /// protected override async ValueTask OnCompleteAsync(string formattedTopic) { - var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage); + var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize); await _channel.SendAsync(envelope, cancellationToken: default); } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs index de501912fae..a614a375658 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs @@ -98,5 +98,11 @@ internal static string ChannelWriter_FailedToSend { return ResourceManager.GetString("ChannelWriter_FailedToSend", resourceCulture); } } + + internal static string PostgresMessageEnvelope_PayloadTooLarge { + get { + return ResourceManager.GetString("PostgresMessageEnvelope_PayloadTooLarge", resourceCulture); + } + } } } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx index 8286ad7f388..cb4593fb29c 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx @@ -52,4 +52,7 @@ The channel writer failed to send messages. Requeueing {0} messages. Error Message: {1} + + Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes + diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs index c8d6f10f2b0..d45a7b1c860 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs @@ -67,7 +67,7 @@ public async Task SendMessage_Should_SendMessageOverChannel() // Act await channel - .SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None); + .SendAsync(new PostgresMessageEnvelope("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); // Assert await testChannel.WaitForNotificationAsync(); @@ -184,7 +184,7 @@ public async Task SendAsync_Should_SendAndReceive() // Act await channel - .SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None); + .SendAsync(new PostgresMessageEnvelope("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); // Assert SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1)); @@ -210,7 +210,7 @@ await Parallel.ForEachAsync( new ParallelOptions { MaxDegreeOfParallelism = 10 }, async (_, ct) => { - var message = new PostgresMessageEnvelope("test", "foobar"); + var message = new PostgresMessageEnvelope("test", "foobar", _options.MaxMessagePayloadSize); await channel.SendAsync(message, ct); }); diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs index 9127f2dd20e..63d810b2d10 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs @@ -37,7 +37,7 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = new PostgresMessageEnvelope("test", "test"); + var message = new PostgresMessageEnvelope("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act @@ -55,7 +55,7 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes() // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = new PostgresMessageEnvelope("test", "test"); + var message = new PostgresMessageEnvelope("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs index cfc799a3cde..f3b91b5c532 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Text; @@ -5,16 +6,18 @@ namespace HotChocolate.Subscriptions.Postgres; public class PostgresMessageEnvelopeTests { + private readonly PostgresSubscriptionOptions _options = new(); + [Theory] [InlineData("test", "test")] [InlineData("sometopic", """ { ""test"": ""test"" } """)] public void Should_FormatAndParse(string topic, string payload) { // arrange - var envelope = new PostgresMessageEnvelope(topic, payload); + var envelope = new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize); // act - var formatted = envelope.Format(); + var formatted = envelope.FormattedPayload; var parsingResult = PostgresMessageEnvelope .TryParse(formatted, out var parsedTopic, out var parsedPayload); @@ -36,10 +39,10 @@ public void Should_FormatCorrectly( string formatted) { // arrange - var envelope = new PostgresMessageEnvelope(topic, payload); + var envelope = new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize); // act - var result = envelope.Format(); + var result = envelope.FormattedPayload; // assert Assert.Equal(formatted, result[25..]); @@ -54,10 +57,10 @@ public void Format_Should_GenerateACorrectId() for (var i = 0; i < 10_000; i++) { // arrange - var envelope = new PostgresMessageEnvelope("test", "test"); + var envelope = new PostgresMessageEnvelope("test", "test", _options.MaxMessagePayloadSize); // act - var id = envelope.Format()[..24]; + var id = envelope.FormattedPayload[..24]; // assert var bytes = Encoding.UTF8.GetBytes(id); @@ -78,59 +81,38 @@ public void Format_Should_GenerateACorrectId() } [Fact] - public void Should_FormatAndParseWithBigPayload() + public void Format_ShouldThrow_WithBigPayload() { // arrange var topic = "test"; var payload = new string('a', 100_000); - var envelope = new PostgresMessageEnvelope(topic, payload); - // act - var formatted = envelope.Format(); - var parsingResult = PostgresMessageEnvelope - .TryParse(formatted, out var parsedTopic, out var parsedPayload); - - // assert - Assert.True(parsingResult); - Assert.Equal(topic, parsedTopic); - Assert.Equal(payload, parsedPayload); + // act, assert + Assert.Throws(() => + new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize)); } [Fact] - public void Should_FormatAndParseWithBigTopic() + public void Format_ShouldThrow_WithBigTopic() { // arrange var topic = new string('a', 100_000); var payload = "test"; - var envelope = new PostgresMessageEnvelope(topic, payload); - - // act - var formatted = envelope.Format(); - var parsingResult = PostgresMessageEnvelope - .TryParse(formatted, out var parsedTopic, out var parsedPayload); - // assert - Assert.True(parsingResult); - Assert.Equal(topic, parsedTopic); - Assert.Equal(payload, parsedPayload); + // act, assert + Assert.Throws(() => + new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize)); } [Fact] - public void Should_FormatAndParseWithBigTopicAndPayload() + public void Format_ShouldThrow_WithBigTopicAndPayload() { // arrange var topic = new string('a', 100_000); var payload = new string('a', 100_000); - var envelope = new PostgresMessageEnvelope(topic, payload); - - // act - var formatted = envelope.Format(); - var parsingResult = PostgresMessageEnvelope - .TryParse(formatted, out var parsedTopic, out var parsedPayload); - // assert - Assert.True(parsingResult); - Assert.Equal(topic, parsedTopic); - Assert.Equal(payload, parsedPayload); + // act, assert + Assert.Throws(() => + new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize)); } } From 35e59d36e47fede9099b7bdb67284a37c05c4c10 Mon Sep 17 00:00:00 2001 From: William Denton Date: Thu, 9 Nov 2023 10:56:49 +1300 Subject: [PATCH 2/3] convert constructor to static factory --- .../PostgresMessageEnvelope.cs | 8 ++++++-- .../src/Subscriptions.Postgres/PostgressPubSub.cs | 4 ++-- .../PostgresChannelTests.cs | 6 +++--- .../PostgresChannelWriterTests.cs | 4 ++-- .../PostgresMessageEnvelopeTests.cs | 12 ++++++------ 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs index cd86ac3fc75..97235f03151 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs @@ -14,10 +14,14 @@ internal readonly struct PostgresMessageEnvelope private const byte separator = (byte)':'; private const byte _messageIdLength = 24; - public PostgresMessageEnvelope(string topic, string payload, int maxMessagePayloadSize) + + public static PostgresMessageEnvelope Create(string topic, string payload, int maxMessagePayloadSize) + => new (topic, Format(topic, payload, maxMessagePayloadSize)); + + private PostgresMessageEnvelope(string topic, string formattedPayload) { Topic = topic; - FormattedPayload = Format(topic, payload, maxMessagePayloadSize); + FormattedPayload = formattedPayload; } public string Topic { get; } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs index 5525d3e2fb3..4deeec49455 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs @@ -34,7 +34,7 @@ protected override async ValueTask OnSendAsync( { var serialized = _serializer.Serialize(message); - var envelope = new PostgresMessageEnvelope(formattedTopic, serialized, _maxMessagePayloadSize); + var envelope = PostgresMessageEnvelope.Create(formattedTopic, serialized, _maxMessagePayloadSize); await _channel.SendAsync(envelope, cancellationToken); } @@ -42,7 +42,7 @@ protected override async ValueTask OnSendAsync( /// protected override async ValueTask OnCompleteAsync(string formattedTopic) { - var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize); + var envelope = PostgresMessageEnvelope.Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize); await _channel.SendAsync(envelope, cancellationToken: default); } diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs index d45a7b1c860..61de4ad6182 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs @@ -67,7 +67,7 @@ public async Task SendMessage_Should_SendMessageOverChannel() // Act await channel - .SendAsync(new PostgresMessageEnvelope("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); + .SendAsync(PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); // Assert await testChannel.WaitForNotificationAsync(); @@ -184,7 +184,7 @@ public async Task SendAsync_Should_SendAndReceive() // Act await channel - .SendAsync(new PostgresMessageEnvelope("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); + .SendAsync(PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); // Assert SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1)); @@ -210,7 +210,7 @@ await Parallel.ForEachAsync( new ParallelOptions { MaxDegreeOfParallelism = 10 }, async (_, ct) => { - var message = new PostgresMessageEnvelope("test", "foobar", _options.MaxMessagePayloadSize); + var message = PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize); await channel.SendAsync(message, ct); }); diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs index 63d810b2d10..a0df078c134 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs @@ -37,7 +37,7 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = new PostgresMessageEnvelope("test", "test", _options.MaxMessagePayloadSize); + var message = PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act @@ -55,7 +55,7 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes() // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = new PostgresMessageEnvelope("test", "test", _options.MaxMessagePayloadSize); + var message = PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs index f3b91b5c532..fadcde87b2e 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs @@ -14,7 +14,7 @@ public class PostgresMessageEnvelopeTests public void Should_FormatAndParse(string topic, string payload) { // arrange - var envelope = new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize); + var envelope = PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); // act var formatted = envelope.FormattedPayload; @@ -39,7 +39,7 @@ public void Should_FormatCorrectly( string formatted) { // arrange - var envelope = new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize); + var envelope = PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); // act var result = envelope.FormattedPayload; @@ -57,7 +57,7 @@ public void Format_Should_GenerateACorrectId() for (var i = 0; i < 10_000; i++) { // arrange - var envelope = new PostgresMessageEnvelope("test", "test", _options.MaxMessagePayloadSize); + var envelope = PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); // act var id = envelope.FormattedPayload[..24]; @@ -89,7 +89,7 @@ public void Format_ShouldThrow_WithBigPayload() // act, assert Assert.Throws(() => - new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize)); + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize)); } [Fact] @@ -101,7 +101,7 @@ public void Format_ShouldThrow_WithBigTopic() // act, assert Assert.Throws(() => - new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize)); + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize)); } [Fact] @@ -113,6 +113,6 @@ public void Format_ShouldThrow_WithBigTopicAndPayload() // act, assert Assert.Throws(() => - new PostgresMessageEnvelope(topic, payload, _options.MaxMessagePayloadSize)); + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize)); } } From c580f5092f35b6877621f0f9e59223413fb21ca0 Mon Sep 17 00:00:00 2001 From: Pascal Senn Date: Wed, 8 Nov 2023 23:40:28 +0100 Subject: [PATCH 3/3] Fixed formatting --- .../PostgresMessageEnvelope.cs | 18 ++++++++++++------ .../Subscriptions.Postgres/PostgressPubSub.cs | 7 +++++-- .../PostgresChannelTests.cs | 16 +++++++++++----- .../PostgresChannelWriterTests.cs | 6 ++++-- .../PostgresMessageEnvelopeTests.cs | 9 ++++++--- 5 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs index 97235f03151..50e3570d8cd 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs @@ -14,10 +14,6 @@ internal readonly struct PostgresMessageEnvelope private const byte separator = (byte)':'; private const byte _messageIdLength = 24; - - public static PostgresMessageEnvelope Create(string topic, string payload, int maxMessagePayloadSize) - => new (topic, Format(topic, payload, maxMessagePayloadSize)); - private PostgresMessageEnvelope(string topic, string formattedPayload) { Topic = topic; @@ -83,8 +79,12 @@ private static string Format(string topic, string payload, int maxMessagePayload if (endOfEncodedString > maxMessagePayloadSize) { - var msg = string.Format(PostgresMessageEnvelope_PayloadTooLarge, endOfEncodedString, maxMessagePayloadSize); - throw new ArgumentException(msg, nameof(payload)); + throw new ArgumentException( + string.Format( + PostgresMessageEnvelope_PayloadTooLarge, + endOfEncodedString, + maxMessagePayloadSize), + nameof(payload)); } return result; @@ -133,4 +133,10 @@ public static bool TryParse( return true; } + + public static PostgresMessageEnvelope Create( + string topic, + string payload, + int maxMessagePayloadSize) + => new (topic, Format(topic, payload, maxMessagePayloadSize)); } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs index 4deeec49455..3d8e2b85a15 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs @@ -34,7 +34,8 @@ protected override async ValueTask OnSendAsync( { var serialized = _serializer.Serialize(message); - var envelope = PostgresMessageEnvelope.Create(formattedTopic, serialized, _maxMessagePayloadSize); + var envelope = PostgresMessageEnvelope + .Create(formattedTopic, serialized, _maxMessagePayloadSize); await _channel.SendAsync(envelope, cancellationToken); } @@ -42,7 +43,9 @@ protected override async ValueTask OnSendAsync( /// protected override async ValueTask OnCompleteAsync(string formattedTopic) { - var envelope = PostgresMessageEnvelope.Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize); + var envelope = PostgresMessageEnvelope + .Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize); + await _channel.SendAsync(envelope, cancellationToken: default); } diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs index 61de4ad6182..bd076f85fed 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs @@ -66,8 +66,10 @@ public async Task SendMessage_Should_SendMessageOverChannel() using var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act - await channel - .SendAsync(PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); + var message = + PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize); + + await channel.SendAsync(message, CancellationToken.None); // Assert await testChannel.WaitForNotificationAsync(); @@ -183,8 +185,10 @@ public async Task SendAsync_Should_SendAndReceive() channel.Subscribe(listener); // Act - await channel - .SendAsync(PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize), CancellationToken.None); + var message = + PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize); + + await channel.SendAsync(message, CancellationToken.None); // Assert SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1)); @@ -210,7 +214,9 @@ await Parallel.ForEachAsync( new ParallelOptions { MaxDegreeOfParallelism = 10 }, async (_, ct) => { - var message = PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize); + var message = PostgresMessageEnvelope + .Create("test", "foobar", _options.MaxMessagePayloadSize); + await channel.SendAsync(message, ct); }); diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs index a0df078c134..2ea3089fa16 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs @@ -37,7 +37,8 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); + var message = + PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act @@ -55,7 +56,8 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes() // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); + var message = + PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs index fadcde87b2e..6b212be9182 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs @@ -14,7 +14,8 @@ public class PostgresMessageEnvelopeTests public void Should_FormatAndParse(string topic, string payload) { // arrange - var envelope = PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); + var envelope = + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); // act var formatted = envelope.FormattedPayload; @@ -39,7 +40,8 @@ public void Should_FormatCorrectly( string formatted) { // arrange - var envelope = PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); + var envelope = + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); // act var result = envelope.FormattedPayload; @@ -57,7 +59,8 @@ public void Format_Should_GenerateACorrectId() for (var i = 0; i < 10_000; i++) { // arrange - var envelope = PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); + var envelope = + PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); // act var id = envelope.FormattedPayload[..24];