Skip to content

Commit

Permalink
Merge branch 'main' into fix/object-scalar-serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored Nov 10, 2023
2 parents 8a55169 + 793b9c2 commit 5c67282
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken
command.CommandText = "SELECT pg_notify(@channel, @message);";

command.Parameters.Add(new NpgsqlParameter("channel", _channelName));
command.Parameters.Add(new NpgsqlParameter("message", message.Format()));
command.Parameters.Add(new NpgsqlParameter("message", message.FormattedPayload));

batch.BatchCommands.Add(command);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Buffers.Text;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using static HotChocolate.Subscriptions.Postgres.PostgresResources;


namespace HotChocolate.Subscriptions.Postgres;

Expand All @@ -12,20 +14,20 @@ internal readonly struct PostgresMessageEnvelope
private const byte separator = (byte)':';
private const byte _messageIdLength = 24;

public PostgresMessageEnvelope(string topic, string payload)
private PostgresMessageEnvelope(string topic, string formattedPayload)
{
Topic = topic;
Payload = payload;
FormattedPayload = formattedPayload;
}

public string Topic { get; }

public string Payload { get; }
public string FormattedPayload { get; }

public string Format()
private static string Format(string topic, string payload, int maxMessagePayloadSize)
{
var topicMaxBytesCount = _utf8.GetMaxByteCount(Topic.Length);
var payloadMaxBytesCount = _utf8.GetMaxByteCount(Payload.Length);
var topicMaxBytesCount = _utf8.GetMaxByteCount(topic.Length);
var payloadMaxBytesCount = _utf8.GetMaxByteCount(payload.Length);
// we encode the topic to base64 to ensure that we do not have the separator in the topic
var topicMaxLength = Base64.GetMaxEncodedToUtf8Length(topicMaxBytesCount);
var maxSize = topicMaxLength + 2 + payloadMaxBytesCount + _messageIdLength;
Expand Down Expand Up @@ -55,7 +57,7 @@ public string Format()
slicedBuffer = slicedBuffer[1..];

// write topic as base64
var topicLengthUtf8 = _utf8.GetBytes(Topic, slicedBuffer);
var topicLengthUtf8 = _utf8.GetBytes(topic, slicedBuffer);
Base64.EncodeToUtf8InPlace(slicedBuffer, topicLengthUtf8, out var topicLengthBase64);
slicedBuffer = slicedBuffer[topicLengthBase64..];

Expand All @@ -64,7 +66,7 @@ public string Format()
slicedBuffer = slicedBuffer[1..];

// write payload
var payloadLengthUtf8 = _utf8.GetBytes(Payload, slicedBuffer);
var payloadLengthUtf8 = _utf8.GetBytes(payload, slicedBuffer);

// create string
var endOfEncodedString = topicLengthBase64 + 2 + payloadLengthUtf8 + _messageIdLength;
Expand All @@ -75,6 +77,16 @@ public string Format()
ArrayPool<byte>.Shared.Return(bufferArray);
}

if (endOfEncodedString > maxMessagePayloadSize)
{
throw new ArgumentException(
string.Format(
PostgresMessageEnvelope_PayloadTooLarge,
endOfEncodedString,
maxMessagePayloadSize),
nameof(payload));
}

return result;
}

Expand Down Expand Up @@ -121,4 +133,10 @@ public static bool TryParse(

return true;
}

public static PostgresMessageEnvelope Create(
string topic,
string payload,
int maxMessagePayloadSize)
=> new (topic, Format(topic, payload, maxMessagePayloadSize));
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public sealed class PostgresSubscriptionOptions
/// </summary>
public int MaxSendQueueSize { get; set; } = 2048;

/// <summary>
/// The maximum serialized size of a message that can be sent using postgres notify.
/// </summary>
public int MaxMessagePayloadSize { get; set; } = 8000;

/// <summary>
/// The subscription options that are used to configure the subscriptions.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ internal sealed class PostgresPubSub : DefaultPubSub
private readonly IMessageSerializer _serializer;
private readonly PostgresChannel _channel;

private readonly int _maxMessagePayloadSize;
private readonly int _topicBufferCapacity;
private readonly TopicBufferFullMode _topicBufferFullMode;

Expand All @@ -20,6 +21,7 @@ public PostgresPubSub(
{
_serializer = serializer;
_channel = channel;
_maxMessagePayloadSize = options.MaxMessagePayloadSize;
_topicBufferCapacity = options.SubscriptionOptions.TopicBufferCapacity;
_topicBufferFullMode = options.SubscriptionOptions.TopicBufferFullMode;
}
Expand All @@ -32,15 +34,18 @@ protected override async ValueTask OnSendAsync<TMessage>(
{
var serialized = _serializer.Serialize(message);

var envelope = new PostgresMessageEnvelope(formattedTopic, serialized);
var envelope = PostgresMessageEnvelope
.Create(formattedTopic, serialized, _maxMessagePayloadSize);

await _channel.SendAsync(envelope, cancellationToken);
}

/// <inheritdoc />
protected override async ValueTask OnCompleteAsync(string formattedTopic)
{
var envelope = new PostgresMessageEnvelope(formattedTopic, _serializer.CompleteMessage);
var envelope = PostgresMessageEnvelope
.Create(formattedTopic, _serializer.CompleteMessage, _maxMessagePayloadSize);

await _channel.SendAsync(envelope, cancellationToken: default);
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@
<data name="ChannelWriter_FailedToSend" xml:space="preserve">
<value>The channel writer failed to send messages. Requeueing {0} messages. Error Message: {1}</value>
</data>
<data name="PostgresMessageEnvelope_PayloadTooLarge" xml:space="preserve">
<value>Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes</value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public async Task SendMessage_Should_SendMessageOverChannel()
using var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
await channel
.SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None);
var message =
PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, CancellationToken.None);

// Assert
await testChannel.WaitForNotificationAsync();
Expand Down Expand Up @@ -183,8 +185,10 @@ public async Task SendAsync_Should_SendAndReceive()
channel.Subscribe(listener);

// Act
await channel
.SendAsync(new PostgresMessageEnvelope("test", "foobar"), CancellationToken.None);
var message =
PostgresMessageEnvelope.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, CancellationToken.None);

// Assert
SpinWait.SpinUntil(() => receivedMessages.Count == 1, TimeSpan.FromSeconds(1));
Expand All @@ -210,7 +214,9 @@ await Parallel.ForEachAsync(
new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (_, ct) =>
{
var message = new PostgresMessageEnvelope("test", "foobar");
var message = PostgresMessageEnvelope
.Create("test", "foobar", _options.MaxMessagePayloadSize);

await channel.SendAsync(message, ct);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public async Task SendAsync_Should_WriteMessageToChannel_When_CalledWithValidInp
// Arrange
var postgresChannelWriter = new PostgresChannelWriter(_events, _options);
await postgresChannelWriter.Initialize(CancellationToken.None);
var message = new PostgresMessageEnvelope("test", "test");
var message =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);
var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
Expand All @@ -55,7 +56,8 @@ public async Task SendAsync_Should_WriteManyMessage_When_CalledManyTimes()
// Arrange
var postgresChannelWriter = new PostgresChannelWriter(_events, _options);
await postgresChannelWriter.Initialize(CancellationToken.None);
var message = new PostgresMessageEnvelope("test", "test");
var message =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);
var testChannel = new TestChannel(SyncConnectionFactory, _channelName);

// Act
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace HotChocolate.Subscriptions.Postgres;

public class PostgresMessageEnvelopeTests
{
private readonly PostgresSubscriptionOptions _options = new();

[Theory]
[InlineData("test", "test")]
[InlineData("sometopic", """ { ""test"": ""test"" } """)]
public void Should_FormatAndParse(string topic, string payload)
{
// arrange
var envelope = new PostgresMessageEnvelope(topic, payload);
var envelope =
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize);

// act
var formatted = envelope.Format();
var formatted = envelope.FormattedPayload;
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

Expand All @@ -36,10 +40,11 @@ public void Should_FormatCorrectly(
string formatted)
{
// arrange
var envelope = new PostgresMessageEnvelope(topic, payload);
var envelope =
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize);

// act
var result = envelope.Format();
var result = envelope.FormattedPayload;

// assert
Assert.Equal(formatted, result[25..]);
Expand All @@ -54,10 +59,11 @@ public void Format_Should_GenerateACorrectId()
for (var i = 0; i < 10_000; i++)
{
// arrange
var envelope = new PostgresMessageEnvelope("test", "test");
var envelope =
PostgresMessageEnvelope.Create("test", "test", _options.MaxMessagePayloadSize);

// act
var id = envelope.Format()[..24];
var id = envelope.FormattedPayload[..24];

// assert
var bytes = Encoding.UTF8.GetBytes(id);
Expand All @@ -78,59 +84,38 @@ public void Format_Should_GenerateACorrectId()
}

[Fact]
public void Should_FormatAndParseWithBigPayload()
public void Format_ShouldThrow_WithBigPayload()
{
// arrange
var topic = "test";
var payload = new string('a', 100_000);
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}

[Fact]
public void Should_FormatAndParseWithBigTopic()
public void Format_ShouldThrow_WithBigTopic()
{
// arrange
var topic = new string('a', 100_000);
var payload = "test";
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}

[Fact]
public void Should_FormatAndParseWithBigTopicAndPayload()
public void Format_ShouldThrow_WithBigTopicAndPayload()
{
// arrange
var topic = new string('a', 100_000);
var payload = new string('a', 100_000);
var envelope = new PostgresMessageEnvelope(topic, payload);

// act
var formatted = envelope.Format();
var parsingResult = PostgresMessageEnvelope
.TryParse(formatted, out var parsedTopic, out var parsedPayload);

// assert
Assert.True(parsingResult);
Assert.Equal(topic, parsedTopic);
Assert.Equal(payload, parsedPayload);
// act, assert
Assert.Throws<ArgumentException>(() =>
PostgresMessageEnvelope.Create(topic, payload, _options.MaxMessagePayloadSize));
}
}
6 changes: 3 additions & 3 deletions website/src/docs/strawberryshake/v13/networking/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ Strawberry Shake supports multiple network protocols to communicate with your Gr

# Transport Profiles

In order to have a small client size and generate the optimized client for your use-case Strawberry Shake uses transport profiles. By default Strawberry Shake will generate a client that uses `GraphQL over HTTP` for queries and mutations and `subscriptions-transport-ws` for subscriptions. Meaning if you are only using queries and mutations you need to add the package `StrawberryShake.Transport.Http`.
In order to have a small client size and generate the optimized client for your use-case Strawberry Shake uses transport profiles. By default Strawberry Shake will generate a client that uses `GraphQL over HTTP` for queries and mutations and `graphql-transport-ws` for subscriptions. Meaning if you are only using queries and mutations you need to add the package `StrawberryShake.Transport.Http`.

There are cases in which we want to define specialize transport profiles where we for instance define for each request type a specific transport. You can define those transport profiles in your `.graphqlrc.json`.
There are cases in which we want to define specialized transport profiles where we define for each request type a specific transport. You can define those transport profiles in your `.graphqlrc.json`.

The following `.graphqlrc.json` can be copied into our getting started example and will create two transport profiles. The first is called `Default` and matches the internal default. It will use `GraphQL over HTTP` by default and use `subscriptions-transport-ws` for subscriptions. The second profile is called `WebSocket` and will also use `GraphQL over HTTP` by default but for mutations and subscriptions it will use `subscriptions-transport-ws`.
The following `.graphqlrc.json` can be copied into our getting started example and will create two transport profiles. The first is called `Default` and matches the internal default. It will use `GraphQL over HTTP` by default and use `graphql-transport-ws` for subscriptions. The second profile is called `WebSocket` and will also use `GraphQL over HTTP` by default but for mutations and subscriptions it will use `graphql-transport-ws`.

```json
{
Expand Down

0 comments on commit 5c67282

Please sign in to comment.