From 6740b8d78e7acd992b2b3afb0e48bc324c191f2c Mon Sep 17 00:00:00 2001 From: William Denton Date: Thu, 9 Nov 2023 23:09:04 +1300 Subject: [PATCH] Set limit on serialized message size in Subscriptions.Postgres (#6678) Co-authored-by: Pascal Senn --- .../PostgresChannelWriter.cs | 2 +- .../PostgresMessageEnvelope.cs | 34 +++++++--- .../PostgresSubscriptionOptions.cs | 5 ++ .../Subscriptions.Postgres/PostgressPubSub.cs | 9 ++- .../Properties/PostgresResources.Designer.cs | 6 ++ .../Properties/PostgresResources.resx | 3 + .../PostgresChannelTests.cs | 16 +++-- .../PostgresChannelWriterTests.cs | 6 +- .../PostgresMessageEnvelopeTests.cs | 63 +++++++------------ 9 files changed, 87 insertions(+), 57 deletions(-) diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs index 718517ac97e..6edb4044e9a 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresChannelWriter.cs @@ -101,7 +101,7 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken command.CommandText = "SELECT pg_notify(@channel, @message);"; command.Parameters.Add(new NpgsqlParameter("channel", _channelName)); - command.Parameters.Add(new NpgsqlParameter("message", message.Format())); + command.Parameters.Add(new NpgsqlParameter("message", message.FormattedPayload)); batch.BatchCommands.Add(command); } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs index fb1fc3c529d..50e3570d8cd 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresMessageEnvelope.cs @@ -2,6 +2,8 @@ using System.Buffers.Text; using System.Diagnostics.CodeAnalysis; using System.Text; +using static HotChocolate.Subscriptions.Postgres.PostgresResources; + namespace HotChocolate.Subscriptions.Postgres; @@ -12,20 +14,20 @@ internal readonly struct PostgresMessageEnvelope private const byte separator = (byte)':'; private const byte _messageIdLength = 24; - public PostgresMessageEnvelope(string topic, string payload) + private PostgresMessageEnvelope(string topic, string formattedPayload) { Topic = topic; - Payload = payload; + FormattedPayload = formattedPayload; } public string Topic { get; } - public string Payload { get; } + public string FormattedPayload { get; } - public string Format() + private static string Format(string topic, string payload, int maxMessagePayloadSize) { - var topicMaxBytesCount = _utf8.GetMaxByteCount(Topic.Length); - var payloadMaxBytesCount = _utf8.GetMaxByteCount(Payload.Length); + var topicMaxBytesCount = _utf8.GetMaxByteCount(topic.Length); + var payloadMaxBytesCount = _utf8.GetMaxByteCount(payload.Length); // we encode the topic to base64 to ensure that we do not have the separator in the topic var topicMaxLength = Base64.GetMaxEncodedToUtf8Length(topicMaxBytesCount); var maxSize = topicMaxLength + 2 + payloadMaxBytesCount + _messageIdLength; @@ -55,7 +57,7 @@ public string Format() slicedBuffer = slicedBuffer[1..]; // write topic as base64 - var topicLengthUtf8 = _utf8.GetBytes(Topic, slicedBuffer); + var topicLengthUtf8 = _utf8.GetBytes(topic, slicedBuffer); Base64.EncodeToUtf8InPlace(slicedBuffer, topicLengthUtf8, out var topicLengthBase64); slicedBuffer = slicedBuffer[topicLengthBase64..]; @@ -64,7 +66,7 @@ public string Format() slicedBuffer = slicedBuffer[1..]; // write payload - var payloadLengthUtf8 = _utf8.GetBytes(Payload, slicedBuffer); + var payloadLengthUtf8 = _utf8.GetBytes(payload, slicedBuffer); // create string var endOfEncodedString = topicLengthBase64 + 2 + payloadLengthUtf8 + _messageIdLength; @@ -75,6 +77,16 @@ public string Format() ArrayPool.Shared.Return(bufferArray); } + if (endOfEncodedString > maxMessagePayloadSize) + { + throw new ArgumentException( + string.Format( + PostgresMessageEnvelope_PayloadTooLarge, + endOfEncodedString, + maxMessagePayloadSize), + nameof(payload)); + } + return result; } @@ -121,4 +133,10 @@ public static bool TryParse( return true; } + + public static PostgresMessageEnvelope Create( + string topic, + string payload, + int maxMessagePayloadSize) + => new (topic, Format(topic, payload, maxMessagePayloadSize)); } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs index f3aad9aaa5f..56fdfd96036 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgresSubscriptionOptions.cs @@ -43,6 +43,11 @@ public sealed class PostgresSubscriptionOptions /// public int MaxSendQueueSize { get; set; } = 2048; + /// + /// The maximum serialized size of a message that can be sent using postgres notify. + /// + public int MaxMessagePayloadSize { get; set; } = 8000; + /// /// The subscription options that are used to configure the subscriptions. /// diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs index 59495ca5d28..3d8e2b85a15 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/PostgressPubSub.cs @@ -7,6 +7,7 @@ internal sealed class PostgresPubSub : DefaultPubSub private readonly IMessageSerializer _serializer; private readonly PostgresChannel _channel; + private readonly int _maxMessagePayloadSize; private readonly int _topicBufferCapacity; private readonly TopicBufferFullMode _topicBufferFullMode; @@ -20,6 +21,7 @@ public PostgresPubSub( { _serializer = serializer; _channel = channel; + _maxMessagePayloadSize = options.MaxMessagePayloadSize; _topicBufferCapacity = options.SubscriptionOptions.TopicBufferCapacity; _topicBufferFullMode = options.SubscriptionOptions.TopicBufferFullMode; } @@ -32,7 +34,8 @@ protected override async ValueTask OnSendAsync( { var serialized = _serializer.Serialize(message); - var envelope = new PostgresMessageEnvelope(formattedTopic, serialized); + var envelope = PostgresMessageEnvelope + .Create(formattedTopic, serialized, _maxMessagePayloadSize); await _channel.SendAsync(envelope, cancellationToken); } @@ -40,7 +43,9 @@ protected override async ValueTask OnSendAsync( /// protected override async ValueTask OnCompleteAsync(string formattedTopic) { - var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage); + var envelope = PostgresMessageEnvelope + .Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize); + await _channel.SendAsync(envelope, cancellationToken: default); } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs index de501912fae..a614a375658 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.Designer.cs @@ -98,5 +98,11 @@ internal static string ChannelWriter_FailedToSend { return ResourceManager.GetString("ChannelWriter_FailedToSend", resourceCulture); } } + + internal static string PostgresMessageEnvelope_PayloadTooLarge { + get { + return ResourceManager.GetString("PostgresMessageEnvelope_PayloadTooLarge", resourceCulture); + } + } } } diff --git a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx index 8286ad7f388..cb4593fb29c 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx +++ b/src/HotChocolate/Core/src/Subscriptions.Postgres/Properties/PostgresResources.resx @@ -52,4 +52,7 @@ The channel writer failed to send messages. Requeueing {0} messages. Error Message: {1} + + Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes + diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs index c8d6f10f2b0..bd076f85fed 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelTests.cs @@ -66,8 +66,10 @@ public async Task SendMessage_Should_SendMessageOverChannel() using var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act - await channel - .SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None); + var message = + PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize); + + await channel.SendAsync(message, CancellationToken.None); // Assert await testChannel.WaitForNotificationAsync(); @@ -183,8 +185,10 @@ public async Task SendAsync_Should_SendAndReceive() channel.Subscribe(listener); // Act - await channel - .SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None); + var message = + PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize); + + await channel.SendAsync(message, CancellationToken.None); // Assert SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1)); @@ -210,7 +214,9 @@ await Parallel.ForEachAsync( new ParallelOptions { MaxDegreeOfParallelism = 10 }, async (_, ct) => { - var message = new PostgresMessageEnvelope("test", "foobar"); + var message = PostgresMessageEnvelope + .Create("test", "foobar", _options.MaxMessagePayloadSize); + await channel.SendAsync(message, ct); }); diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs index 9127f2dd20e..2ea3089fa16 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresChannelWriterTests.cs @@ -37,7 +37,8 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = new PostgresMessageEnvelope("test", "test"); + var message = + PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act @@ -55,7 +56,8 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes() // Arrange var postgresChannelWriter = new PostgresChannelWriter(_events, _options); await postgresChannelWriter.Initialize(CancellationToken.None); - var message = new PostgresMessageEnvelope("test", "test"); + var message = + PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); var testChannel = new TestChannel(SyncConnectionFactory, _channelName); // Act diff --git a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs index cfc799a3cde..6b212be9182 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Postgres.Tests/PostgresMessageEnvelopeTests.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using System.Text; @@ -5,16 +6,19 @@ namespace HotChocolate.Subscriptions.Postgres; public class PostgresMessageEnvelopeTests { + private readonly PostgresSubscriptionOptions _options = new(); + [Theory] [InlineData("test", "test")] [InlineData("sometopic", """ { ""test"": ""test"" } """)] public void Should_FormatAndParse(string topic, string payload) { // arrange - var envelope = new PostgresMessageEnvelope(topic, payload); + var envelope = + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); // act - var formatted = envelope.Format(); + var formatted = envelope.FormattedPayload; var parsingResult = PostgresMessageEnvelope .TryParse(formatted, out var parsedTopic, out var parsedPayload); @@ -36,10 +40,11 @@ public void Should_FormatCorrectly( string formatted) { // arrange - var envelope = new PostgresMessageEnvelope(topic, payload); + var envelope = + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize); // act - var result = envelope.Format(); + var result = envelope.FormattedPayload; // assert Assert.Equal(formatted, result[25..]); @@ -54,10 +59,11 @@ public void Format_Should_GenerateACorrectId() for (var i = 0; i < 10_000; i++) { // arrange - var envelope = new PostgresMessageEnvelope("test", "test"); + var envelope = + PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize); // act - var id = envelope.Format()[..24]; + var id = envelope.FormattedPayload[..24]; // assert var bytes = Encoding.UTF8.GetBytes(id); @@ -78,59 +84,38 @@ public void Format_Should_GenerateACorrectId() } [Fact] - public void Should_FormatAndParseWithBigPayload() + public void Format_ShouldThrow_WithBigPayload() { // arrange var topic = "test"; var payload = new string('a', 100_000); - var envelope = new PostgresMessageEnvelope(topic, payload); - // act - var formatted = envelope.Format(); - var parsingResult = PostgresMessageEnvelope - .TryParse(formatted, out var parsedTopic, out var parsedPayload); - - // assert - Assert.True(parsingResult); - Assert.Equal(topic, parsedTopic); - Assert.Equal(payload, parsedPayload); + // act, assert + Assert.Throws(() => + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize)); } [Fact] - public void Should_FormatAndParseWithBigTopic() + public void Format_ShouldThrow_WithBigTopic() { // arrange var topic = new string('a', 100_000); var payload = "test"; - var envelope = new PostgresMessageEnvelope(topic, payload); - - // act - var formatted = envelope.Format(); - var parsingResult = PostgresMessageEnvelope - .TryParse(formatted, out var parsedTopic, out var parsedPayload); - // assert - Assert.True(parsingResult); - Assert.Equal(topic, parsedTopic); - Assert.Equal(payload, parsedPayload); + // act, assert + Assert.Throws(() => + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize)); } [Fact] - public void Should_FormatAndParseWithBigTopicAndPayload() + public void Format_ShouldThrow_WithBigTopicAndPayload() { // arrange var topic = new string('a', 100_000); var payload = new string('a', 100_000); - var envelope = new PostgresMessageEnvelope(topic, payload); - - // act - var formatted = envelope.Format(); - var parsingResult = PostgresMessageEnvelope - .TryParse(formatted, out var parsedTopic, out var parsedPayload); - // assert - Assert.True(parsingResult); - Assert.Equal(topic, parsedTopic); - Assert.Equal(payload, parsedPayload); + // act, assert + Assert.Throws(() => + PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize)); } }