Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set limit on serialized message size in Subscriptions.Postgres #6678

Merged
merged 3 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken
command.CommandText = "SELECT pg_notify(@channel, @message);";

command.Parameters.Add(new NpgsqlParameter("channel", _channelName));
command.Parameters.Add(new NpgsqlParameter("message", message.Format()));
command.Parameters.Add(new NpgsqlParameter("message", message.FormattedPayload));

batch.BatchCommands.Add(command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Buffers.Text;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using static HotChocolate.Subscriptions.Postgres.PostgresResources;


namespace HotChocolate.Subscriptions.Postgres;

Expand All @@ -12,20 +14,20 @@ internal readonly struct PostgresMessageEnvelope
private const byte separator = (byte)':';
private const byte _messageIdLength = 24;

public PostgresMessageEnvelope(string topic, string payload)
private PostgresMessageEnvelope(string topic, string formattedPayload)
{
Topic = topic;
Payload = payload;
FormattedPayload = formattedPayload;
}

public string Topic { get; }

public string Payload { get; }
public string FormattedPayload { get; }

public string Format()
private static string Format(string topic, string payload, int maxMessagePayloadSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally would prefer a public static factory method that does the formatting and then passes topic and payload to the envelope ctor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is fair, i wasn't sure about doing that extra work in the constructor, a factory would be better. Ill make that change

{
var topicMaxBytesCount = _utf8.GetMaxByteCount(Topic.Length);
var payloadMaxBytesCount = _utf8.GetMaxByteCount(Payload.Length);
var topicMaxBytesCount = _utf8.GetMaxByteCount(topic.Length);
var payloadMaxBytesCount = _utf8.GetMaxByteCount(payload.Length);
// we encode the topic to base64 to ensure that we do not have the separator in the topic
var topicMaxLength = Base64.GetMaxEncodedToUtf8Length(topicMaxBytesCount);
var maxSize = topicMaxLength + 2 + payloadMaxBytesCount + _messageIdLength;
Expand Down Expand Up @@ -55,7 +57,7 @@ public string Format()
slicedBuffer = slicedBuffer[1..];

// write topic as base64
var topicLengthUtf8 = _utf8.GetBytes(Topic, slicedBuffer);
var topicLengthUtf8 = _utf8.GetBytes(topic, slicedBuffer);
Base64.EncodeToUtf8InPlace(slicedBuffer, topicLengthUtf8, out var topicLengthBase64);
slicedBuffer = slicedBuffer[topicLengthBase64..];

Expand All @@ -64,7 +66,7 @@ public string Format()
slicedBuffer = slicedBuffer[1..];

// write payload
var payloadLengthUtf8 = _utf8.GetBytes(Payload, slicedBuffer);
var payloadLengthUtf8 = _utf8.GetBytes(payload, slicedBuffer);

// create string
var endOfEncodedString = topicLengthBase64 + 2 + payloadLengthUtf8 + _messageIdLength;
Expand All @@ -75,6 +77,16 @@ public string Format()
ArrayPool<byte>.Shared.Return(bufferArray);
}

if (endOfEncodedString > maxMessagePayloadSize)
{
throw new ArgumentException(
string.Format(
PostgresMessageEnvelope_PayloadTooLarge,
endOfEncodedString,
maxMessagePayloadSize),
nameof(payload));
}

return result;
}

Expand Down Expand Up @@ -121,4 +133,10 @@ public static bool TryParse(

return true;
}

public static PostgresMessageEnvelope Create(
string topic,
string payload,
int maxMessagePayloadSize)
=> new (topic, Format(topic, payload, maxMessagePayloadSize));
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public sealed class PostgresSubscriptionOptions
/// </summary>
public int MaxSendQueueSize { get; set; } = 2048;

/// <summary>
/// The maximum serialized size of a message that can be sent using postgres notify.
/// </summary>
public int MaxMessagePayloadSize { get; set; } = 8000;

/// <summary>
/// The subscription options that are used to configure the subscriptions.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal sealed class PostgresPubSub : DefaultPubSub
private readonly IMessageSerializer _serializer;
private readonly PostgresChannel _channel;

private readonly int _maxMessagePayloadSize;
private readonly int _topicBufferCapacity;
private readonly TopicBufferFullMode _topicBufferFullMode;

Expand All @@ -20,6 +21,7 @@ public PostgresPubSub(
{
_serializer = serializer;
_channel = channel;
_maxMessagePayloadSize = options.MaxMessagePayloadSize;
_topicBufferCapacity = options.SubscriptionOptions.TopicBufferCapacity;
_topicBufferFullMode = options.SubscriptionOptions.TopicBufferFullMode;
}
Expand All @@ -32,15 +34,18 @@ protected override async ValueTask OnSendAsync<TMessage>(
{
var serialized = _serializer.Serialize(message);

var envelope = new PostgresMessageEnvelope(formattedTopic, serialized);
var envelope = PostgresMessageEnvelope
.Create(formattedTopic, serialized, _maxMessagePayloadSize);

await _channel.SendAsync(envelope, cancellationToken);
}

/// <inheritdoc />
protected override async ValueTask OnCompleteAsync(string formattedTopic)
{
var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage);
var envelope = PostgresMessageEnvelope
.Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize);

await _channel.SendAsync(envelope, cancellationToken: default);
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@
<data name="ChannelWriter_FailedToSend" xml:space="preserve">
<value>The channel writer failed to send messages. Requeueing {0} messages. Error Message: {1}</value>
</data>
<data name="PostgresMessageEnvelope_PayloadTooLarge" xml:space="preserve">
<value>Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes</value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public async Task SendMessage_Should_SendMessageOverChannel()
using var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
await channel
.SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None);
var message =
PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, CancellationToken.None);

// Assert
await testChannel.WaitForNotificationAsync();
Expand Down Expand Up @@ -183,8 +185,10 @@ public async Task SendAsync_Should_SendAndReceive()
channel.Subscribe(listener);

// Act
await channel
.SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None);
var message =
PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, CancellationToken.None);

// Assert
SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1));
Expand All @@ -210,7 +214,9 @@ await Parallel.ForEachAsync(
new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (_, ct) =>
{
var message = new PostgresMessageEnvelope("test", "foobar");
var message = PostgresMessageEnvelope
.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, ct);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp
// Arrange
var postgresChannelWriter = new PostgresChannelWriter(_events, _options);
await postgresChannelWriter.Initialize(CancellationToken.None);
var message = new PostgresMessageEnvelope("test", "test");
var message =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);
var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
Expand All @@ -55,7 +56,8 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes()
// Arrange
var postgresChannelWriter = new PostgresChannelWriter(_events, _options);
await postgresChannelWriter.Initialize(CancellationToken.None);
var message = new PostgresMessageEnvelope("test", "test");
var message =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);
var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace HotChocolate.Subscriptions.Postgres;

public class PostgresMessageEnvelopeTests
{
private readonly PostgresSubscriptionOptions _options = new();

[Theory]
[InlineData("test", "test")]
[InlineData("sometopic", """ { ""test"": ""test"" } """)]
public void Should_FormatAndParse(string topic, string payload)
{
// arrange
var envelope = new PostgresMessageEnvelope(topic, payload);
var envelope =
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize);

// act
var formatted = envelope.Format();
var formatted = envelope.FormattedPayload;
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

Expand All @@ -36,10 +40,11 @@ public void Should_FormatCorrectly(
string formatted)
{
// arrange
var envelope = new PostgresMessageEnvelope(topic, payload);
var envelope =
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize);

// act
var result = envelope.Format();
var result = envelope.FormattedPayload;

// assert
Assert.Equal(formatted, result[25..]);
Expand All @@ -54,10 +59,11 @@ public void Format_Should_GenerateACorrectId()
for (var i = 0; i < 10_000; i++)
{
// arrange
var envelope = new PostgresMessageEnvelope("test", "test");
var envelope =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);

// act
var id = envelope.Format()[..24];
var id = envelope.FormattedPayload[..24];

// assert
var bytes = Encoding.UTF8.GetBytes(id);
Expand All @@ -78,59 +84,38 @@ public void Format_Should_GenerateACorrectId()
}

[Fact]
public void Should_FormatAndParseWithBigPayload()
public void Format_ShouldThrow_WithBigPayload()
{
// arrange
var topic = "test";
var payload = new string('a', 100_000);
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}

[Fact]
public void Should_FormatAndParseWithBigTopic()
public void Format_ShouldThrow_WithBigTopic()
{
// arrange
var topic = new string('a', 100_000);
var payload = "test";
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}

[Fact]
public void Should_FormatAndParseWithBigTopicAndPayload()
public void Format_ShouldThrow_WithBigTopicAndPayload()
{
// arrange
var topic = new string('a', 100_000);
var payload = new string('a', 100_000);
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}
}
Loading