Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a public API for getting NatsHeaders in the byte array (serialization) #638

Closed
Farrukhjon opened this issue Sep 25, 2024 · 7 comments · Fixed by #674
Closed

Expose a public API for getting NatsHeaders in the byte array (serialization) #638

Farrukhjon opened this issue Sep 25, 2024 · 7 comments · Fixed by #674
Assignees
Labels
good first issue Good for newcomers help wanted Extra attention is needed new-feature

Comments

@Farrukhjon
Copy link

Farrukhjon commented Sep 25, 2024

Proposed change

Such kind of methods :

  • public byte[] NatsHeaders.SerializeToArray() or public byte[] NatsHeaders.GetSerialized() - Returns the byte array in a serialized version (ASCII or any other encoding).
  • public int SerializedLength() - Returns the number of bytes in a serialized version (ASCII or any other encoding).

Use case

Need to calculate/take into account NatsHeaders size (length in bytes) up front to call Publish for large Nats message which exceeds payload size against server's maximum payload size.

Contribution

No response

@mtmk
Copy link
Collaborator

mtmk commented Sep 25, 2024

we can open HeaderWriter that's what we're using internally

internal class HeaderWriter

example usage:

var headers = new NatsHeaders();
var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 0));
var writer = new HeaderWriter(Encoding.UTF8);
var written = writer.Write(pipe.Writer, headers);

@Farrukhjon
Copy link
Author

I want to publish a message with data (payload) size close to the ServerInfo.MaxPayload.
I thought if I take the message headers size into account for the data size as overhead, then I'll be able to publish the message to NATS Server.

But the publish fails with NATS.Client.Core.NatsException: Payload size 1398091 exceeds server's maximum payload size 1048576 error.

The headers size is calculated with:

var headers = new NatsHeaders();
var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 0));
var writer = new HeaderWriter(Encoding.UTF8);
var written = writer.Write(pipe.Writer, headers);

I based my assumption on these lines:

var size = payloadBuffer.WrittenMemory.Length + (headersBuffer?.WrittenMemory.Length ?? 0);
if (_connection.ServerInfo is { } info && size > info.MaxPayload)
{
throw new NatsException($"Payload size {size} exceeds server's maximum payload size {info.MaxPayload}");
}

This is the test:

[Fact(DisplayName = "Test no NatsException is thrown when headers size took into account as payload overhead for publishing message with MaxPayload")]
    public async Task TestNoMaxPayloadExceedsNatsExceptionIsThrown()
    {
        // Given
        var headers = new NatsHeaders() { ["foo"] = new StringValues(["bar1", "bar2", "bar3"]) };
        var natsHeadersUtil = new NatsHeadersUtil(Encoding.UTF8);
        var headersSizeInBytes = (int)natsHeadersUtil.NatsHeadersSizeInBytes(headers);
        var connection = await _natsServerContainer.GetOrCreateNatsConnectionAsync();
        var serverInfo = connection.ServerInfo ?? throw new ArgumentException("ServerInfo is null!");
        var maxPayloadSize = serverInfo.MaxPayload;
        var maxAllowedPayload = this._stringRandomizer.GenerateRandomStringInBytes(maxPayloadSize - headersSizeInBytes);

        Assert.True(maxAllowedPayload.Length < maxPayloadSize);
        Assert.Equal(maxPayloadSize - headersSizeInBytes, maxAllowedPayload.Length);
        Assert.Equal(45, headersSizeInBytes);

        // When a message with the max allowed payload is published.
        var exception = await Record.ExceptionAsync(
            async () => await connection.PublishAsync("test.subject", maxAllowedPayload, headers)
        );
        // Then no exception is thrown.
        Assert.Null(exception);
    }

And this is implementation of the NatsHeadersSizeInBytes method:

public class NatsHeadersUtil(Encoding encoding)
{
    private readonly Encoding _encoding = encoding;

    public long NatsHeadersSizeInBytes(NatsHeaders headers)
    {
        var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: 0));
        var writer = new HeaderWriter(_encoding);
        var headerSizeInBytes = writer.Write(pipe.Writer, headers);
        return headerSizeInBytes;
    }

}

Stack trace of the failure

Assert.Null() Failure: Value is not null
Expected: null
Actual:   NATS.Client.Core.NatsException: Payload size 1398091 exceeds server's maximum payload size 1048576
   at NATS.Client.Core.Commands.CommandWriter.PublishAsync[T](String subject, T value, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, CancellationToken cancellationToken)
   at NATS.Client.Core.NatsConnection.PublishAsync[T](String subject, T data, NatsHeaders headers, String replyTo, INatsSerialize`1 serializer, NatsPubOpts opts, CancellationToken cancellationToken)
--- End of stack trace from previous location ---

I just curious, how such much exceeding number of bytes (1398091-1048576 = 349515) is computed?

Note: I made available accessibility of HeaderWriter class for this test.

This is dev environment:

dotnet --info
.NET SDK:
 Version:           8.0.401
 Commit:            811edcc344
 Workload version:  8.0.400-manifests.f51a3a6b
 MSBuild version:   17.11.4+37eb419ad

Runtime Environment:
 OS Name:     Mac OS X
 OS Version:  14.6
 OS Platform: Darwin
 RID:         osx-x64

@mtmk
Copy link
Collaborator

mtmk commented Sep 25, 2024

thanks for the code. I wonder in your example maxAllowedPayload is much larger because of string encoding?

This is what I was able to try, it worked as expected for me:

using System.Text;
using NATS.Client.Core;
using NATS.Client.Core.Internal;

var nc = new NatsConnection();
await nc.ConnectAsync();
var max = nc.ServerInfo!.MaxPayload;

var headers = new NatsHeaders { { "key1", "value1" }, { "key2", "value2" } };
var buffer = new NatsBufferWriter<byte>();
var headersLength = new HeaderWriter(Encoding.ASCII).Write(buffer, headers);
var payloadLength = max - headersLength;

Console.WriteLine($"headers length: {headersLength}");
Console.WriteLine($"payload length: {payloadLength}");
Console.WriteLine($"    server max: {max}");

await nc.PublishAsync("foo", new byte[payloadLength], headers: headers);
Console.WriteLine("OK");

try
{
    await nc.PublishAsync("foo", new byte[payloadLength + 1], headers: headers);
    Console.WriteLine("OK");
}
catch (NatsException ex)
{
    Console.WriteLine(ex.Message);
}

// Output:
// headers length: 40
// payload length: 1048536
//     server max: 1048576
// OK
// Payload size 1048577 exceeds server's maximum payload size 1048576

running against server with no config:

$ nats-server

@Farrukhjon
Copy link
Author

maxAllowedPayload

The generated maxAllowedPayload is actually small to the headersSizeInBytes amount.

It's assured here:

Assert.True(maxAllowedPayload.Length < maxPayloadSize);
Assert.Equal(maxPayloadSize - headersSizeInBytes, maxAllowedPayload.Length);

Using the same encoding for the payload and the headers (ASCII) resulted to the same error:

var encoding = Encoding.ASCII;
var natsHeadersUtil = new NatsHeadersUtil(encoding);
var headersSizeInBytes = (int)natsHeadersUtil.NatsHeadersSizeInBytes(headers);
var connection = await _natsServerContainer.GetOrCreateNatsConnectionAsync();
var serverInfo = connection.ServerInfo ?? throw new ArgumentException("ServerInfo is null!");
var maxPayloadSize = serverInfo.MaxPayload;
var maxAllowedPayload = this._stringRandomizer.GenerateRandomStringInBytes(maxPayloadSize - headersSizeInBytes, encoding);

@mtmk
Copy link
Collaborator

mtmk commented Sep 25, 2024

what is the type of maxAllowedPayload? it's not possible for me reproduce the issue with the code above not knowing the dependencies. Are you able to create a minimal console app and post Program.cs?

@Farrukhjon
Copy link
Author

Farrukhjon commented Sep 26, 2024

The much exceeding number of bytes in payload issue was in serialization put in connection NatsOpts

var natsOpts = NatsOpts.Default with
{
    SerializerRegistry = NatsJsonSerializerRegistry.Default
};
var nc = new NatsConnection(natsOpts);

I was mislead with such behavior. My other tests which have mix of publishing objects and small texts were fine. But when I wanted to examine maxPayload - headers overhead I faced with this issue.

So, below code reproduces the issue.

var natsOpts = NatsOpts.Default with
{
    SerializerRegistry = NatsJsonSerializerRegistry.Default
};
var nc = new NatsConnection(natsOpts);

await nc.PublishAsync("foo", Encoding.ASCII.GetBytes("Hello World"), headers: headers);
Console.WriteLine("Small text in ASCII bytes is ok");

try
{
    var maxPayload = new byte[payloadLength];
    Array.Fill(maxPayload, (byte)'A');
    await nc.PublishAsync("foo", maxPayload, headers: headers);
    Console.WriteLine("OK");
}
catch (NatsException ex)
{
    Console.WriteLine("Max size text in ASCII bytes is not ok");
    Console.WriteLine(ex.Message);
}

Output:
Small text in ASCII bytes is ok
Max size text in ASCII bytes is not ok
Payload size 1398090 exceeds server's maximum payload size 1048576

P.S: we still need that HeaderWriter be publicly available to use.

@mtmk mtmk added help wanted Extra attention is needed new-feature labels Oct 1, 2024
@mtmk mtmk added the good first issue Good for newcomers label Oct 28, 2024
@SiuTung08
Copy link
Contributor

Currently working on it - we should be able to make the HeaderWrite class public and create a method for getting the header size under it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers help wanted Extra attention is needed new-feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants