Skip to content

Commit

Permalink
Set limit on serialized message size in Subscriptions.Postgres (#6678)
Browse files Browse the repository at this point in the history
Co-authored-by: Pascal Senn <[email protected]>
  • Loading branch information
2 people authored and michaelstaib committed Nov 21, 2023
1 parent ae5e271 commit ac886f7
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken
command.CommandText = "SELECT pg_notify(@channel, @message);";

command.Parameters.Add(new NpgsqlParameter("channel", _channelName));
command.Parameters.Add(new NpgsqlParameter("message", message.Format()));
command.Parameters.Add(new NpgsqlParameter("message", message.FormattedPayload));

batch.BatchCommands.Add(command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Buffers.Text;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using static HotChocolate.Subscriptions.Postgres.PostgresResources;


namespace HotChocolate.Subscriptions.Postgres;

Expand All @@ -12,20 +14,20 @@ internal readonly struct PostgresMessageEnvelope
private const byte separator = (byte)':';
private const byte _messageIdLength = 24;

public PostgresMessageEnvelope(string topic, string payload)
private PostgresMessageEnvelope(string topic, string formattedPayload)
{
Topic = topic;
Payload = payload;
FormattedPayload = formattedPayload;
}

public string Topic { get; }

public string Payload { get; }
public string FormattedPayload { get; }

public string Format()
private static string Format(string topic, string payload, int maxMessagePayloadSize)
{
var topicMaxBytesCount = _utf8.GetMaxByteCount(Topic.Length);
var payloadMaxBytesCount = _utf8.GetMaxByteCount(Payload.Length);
var topicMaxBytesCount = _utf8.GetMaxByteCount(topic.Length);
var payloadMaxBytesCount = _utf8.GetMaxByteCount(payload.Length);
// we encode the topic to base64 to ensure that we do not have the separator in the topic
var topicMaxLength = Base64.GetMaxEncodedToUtf8Length(topicMaxBytesCount);
var maxSize = topicMaxLength + 2 + payloadMaxBytesCount + _messageIdLength;
Expand Down Expand Up @@ -55,7 +57,7 @@ public string Format()
slicedBuffer = slicedBuffer[1..];

// write topic as base64
var topicLengthUtf8 = _utf8.GetBytes(Topic, slicedBuffer);
var topicLengthUtf8 = _utf8.GetBytes(topic, slicedBuffer);
Base64.EncodeToUtf8InPlace(slicedBuffer, topicLengthUtf8, out var topicLengthBase64);
slicedBuffer = slicedBuffer[topicLengthBase64..];

Expand All @@ -64,7 +66,7 @@ public string Format()
slicedBuffer = slicedBuffer[1..];

// write payload
var payloadLengthUtf8 = _utf8.GetBytes(Payload, slicedBuffer);
var payloadLengthUtf8 = _utf8.GetBytes(payload, slicedBuffer);

// create string
var endOfEncodedString = topicLengthBase64 + 2 + payloadLengthUtf8 + _messageIdLength;
Expand All @@ -75,6 +77,16 @@ public string Format()
ArrayPool<byte>.Shared.Return(bufferArray);
}

if (endOfEncodedString > maxMessagePayloadSize)
{
throw new ArgumentException(
string.Format(
PostgresMessageEnvelope_PayloadTooLarge,
endOfEncodedString,
maxMessagePayloadSize),
nameof(payload));
}

return result;
}

Expand Down Expand Up @@ -121,4 +133,10 @@ public static bool TryParse(

return true;
}

public static PostgresMessageEnvelope Create(
string topic,
string payload,
int maxMessagePayloadSize)
=> new (topic, Format(topic, payload, maxMessagePayloadSize));
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public sealed class PostgresSubscriptionOptions
/// </summary>
public int MaxSendQueueSize { get; set; } = 2048;

/// <summary>
/// The maximum serialized size of a message that can be sent using postgres notify.
/// </summary>
public int MaxMessagePayloadSize { get; set; } = 8000;

/// <summary>
/// The subscription options that are used to configure the subscriptions.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal sealed class PostgresPubSub : DefaultPubSub
private readonly IMessageSerializer _serializer;
private readonly PostgresChannel _channel;

private readonly int _maxMessagePayloadSize;
private readonly int _topicBufferCapacity;
private readonly TopicBufferFullMode _topicBufferFullMode;

Expand All @@ -20,6 +21,7 @@ public PostgresPubSub(
{
_serializer = serializer;
_channel = channel;
_maxMessagePayloadSize = options.MaxMessagePayloadSize;
_topicBufferCapacity = options.SubscriptionOptions.TopicBufferCapacity;
_topicBufferFullMode = options.SubscriptionOptions.TopicBufferFullMode;
}
Expand All @@ -32,15 +34,18 @@ protected override async ValueTask OnSendAsync<TMessage>(
{
var serialized = _serializer.Serialize(message);

var envelope = new PostgresMessageEnvelope(formattedTopic, serialized);
var envelope = PostgresMessageEnvelope
.Create(formattedTopic, serialized, _maxMessagePayloadSize);

await _channel.SendAsync(envelope, cancellationToken);
}

/// <inheritdoc />
protected override async ValueTask OnCompleteAsync(string formattedTopic)
{
var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage);
var envelope = PostgresMessageEnvelope
.Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize);

await _channel.SendAsync(envelope, cancellationToken: default);
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@
<data name="ChannelWriter_FailedToSend" xml:space="preserve">
<value>The channel writer failed to send messages. Requeueing {0} messages. Error Message: {1}</value>
</data>
<data name="PostgresMessageEnvelope_PayloadTooLarge" xml:space="preserve">
<value>Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes</value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public async Task SendMessage_Should_SendMessageOverChannel()
using var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
await channel
.SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None);
var message =
PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, CancellationToken.None);

// Assert
await testChannel.WaitForNotificationAsync();
Expand Down Expand Up @@ -183,8 +185,10 @@ public async Task SendAsync_Should_SendAndReceive()
channel.Subscribe(listener);

// Act
await channel
.SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None);
var message =
PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, CancellationToken.None);

// Assert
SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1));
Expand All @@ -210,7 +214,9 @@ await Parallel.ForEachAsync(
new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (_, ct) =>
{
var message = new PostgresMessageEnvelope("test", "foobar");
var message = PostgresMessageEnvelope
.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, ct);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp
// Arrange
var postgresChannelWriter = new PostgresChannelWriter(_events, _options);
await postgresChannelWriter.Initialize(CancellationToken.None);
var message = new PostgresMessageEnvelope("test", "test");
var message =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);
var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
Expand All @@ -55,7 +56,8 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes()
// Arrange
var postgresChannelWriter = new PostgresChannelWriter(_events, _options);
await postgresChannelWriter.Initialize(CancellationToken.None);
var message = new PostgresMessageEnvelope("test", "test");
var message =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);
var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace HotChocolate.Subscriptions.Postgres;

public class PostgresMessageEnvelopeTests
{
private readonly PostgresSubscriptionOptions _options = new();

[Theory]
[InlineData("test", "test")]
[InlineData("sometopic", """ { ""test"": ""test"" } """)]
public void Should_FormatAndParse(string topic, string payload)
{
// arrange
var envelope = new PostgresMessageEnvelope(topic, payload);
var envelope =
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize);

// act
var formatted = envelope.Format();
var formatted = envelope.FormattedPayload;
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

Expand All @@ -36,10 +40,11 @@ public void Should_FormatCorrectly(
string formatted)
{
// arrange
var envelope = new PostgresMessageEnvelope(topic, payload);
var envelope =
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize);

// act
var result = envelope.Format();
var result = envelope.FormattedPayload;

// assert
Assert.Equal(formatted, result[25..]);
Expand All @@ -54,10 +59,11 @@ public void Format_Should_GenerateACorrectId()
for (var i = 0; i < 10_000; i++)
{
// arrange
var envelope = new PostgresMessageEnvelope("test", "test");
var envelope =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);

// act
var id = envelope.Format()[..24];
var id = envelope.FormattedPayload[..24];

// assert
var bytes = Encoding.UTF8.GetBytes(id);
Expand All @@ -78,59 +84,38 @@ public void Format_Should_GenerateACorrectId()
}

[Fact]
public void Should_FormatAndParseWithBigPayload()
public void Format_ShouldThrow_WithBigPayload()
{
// arrange
var topic = "test";
var payload = new string('a', 100_000);
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}

[Fact]
public void Should_FormatAndParseWithBigTopic()
public void Format_ShouldThrow_WithBigTopic()
{
// arrange
var topic = new string('a', 100_000);
var payload = "test";
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}

[Fact]
public void Should_FormatAndParseWithBigTopicAndPayload()
public void Format_ShouldThrow_WithBigTopicAndPayload()
{
// arrange
var topic = new string('a', 100_000);
var payload = new string('a', 100_000);
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}
}

0 comments on commit ac886f7

Please sign in to comment.