Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise publish serialization exception early #140

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/NATS.Client.Core/Commands/PublishCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ namespace NATS.Client.Core.Commands;

internal sealed class PublishCommand<T> : CommandBase<PublishCommand<T>>
{
// This buffer will be pooled with this command object
private readonly FixedArrayBufferWriter _buffer = new();

private string? _subject;
private string? _replyTo;
private NatsHeaders? _headers;
Expand Down Expand Up @@ -32,12 +35,17 @@ public static PublishCommand<T> Create(ObjectPool pool, string subject, string?
result._serializer = serializer;
result._cancellationToken = cancellationToken;

// Serialize data as soon as possible to propagate any exceptions
// to the caller so that publish method will throw the exception
result._buffer.Reset();
serializer.Serialize(result._buffer, value);

return result;
}

public override void Write(ProtocolWriter writer)
{
writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!);
writer.WritePublish(_subject!, _replyTo, _headers, new ReadOnlySequence<byte>(_buffer.WrittenMemory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, _serializer and _value aren't needed anymore and PublishCommand doesn't need to be generic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, _buffer or rather it's underlying array should be freed here. Otherwise, we would keep a potentially huge buffer allocated after sending a large message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also enables serialization work to happen concurrently (for multiple publish operations) instead of serializing all serialization work inside WriteLoopAsync

Good point.

Also, _buffer or rather it's underlying array should be freed here.

The idea of keeping the _buffer around to avoid GC since it'd be pooled with the command object.

On the other hand we could leave out the buffer management to end developer and just accept ReadOnlySequence or Memory or something or even an IMemoryOwner?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, @jasper-d I think I misunderstand your freeing buffer comment (wasn't looking at where in code you commented). You have a point. I thought about this as well. We could resize the underlying array to be smaller or enforce a size limit as it would be by the server anyway.

}

protected override void Reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,23 @@ private async Task WriteLoopAsync()
continue;
}

if (command is IBatchCommand batch)
try
{
count += batch.Write(protocolWriter);
if (command is IBatchCommand batch)
{
count += batch.Write(protocolWriter);
}
else
{
command.Write(protocolWriter);
count++;
}
}
else
catch (Exception e)
{
command.Write(protocolWriter);
count++;
// flag potential serialization exceptions
((IPromise)command).SetException(e);
continue;
}

if (command is IPromise p)
Expand Down
46 changes: 46 additions & 0 deletions tests/NATS.Client.Core.Tests/SerializerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Buffers;

namespace NATS.Client.Core.Tests;

public class SerializerTest
{
private readonly ITestOutputHelper _output;

public SerializerTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Serializer_exceptions()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

await Assert.ThrowsAsync<TestSerializerException>(async () =>
{
await nats.PublishAsync(
"foo",
0,
opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = false, });
});

await Assert.ThrowsAsync<TestSerializerException>(async () =>
{
await nats.PublishAsync(
"foo",
0,
opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = true, });
});
}
}

public class TestSerializer : INatsSerializer
{
public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException();

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => throw new TestSerializerException();

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type) => throw new TestSerializerException();
}

public class TestSerializerException : Exception
{
}
Loading