Skip to content

Commit

Permalink
Add flags to NatsMsg
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 12, 2024
1 parent a86b4a8 commit 4b231fc
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 25 deletions.
162 changes: 138 additions & 24 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

namespace NATS.Client.Core;

[Flags]
public enum NatsMsgFlags : byte
{
None = 0,
Empty = 1,
NoResponders = 2,
}

/// <summary>
/// This interface provides an optional contract when passing
/// messages to processing methods which is usually helpful in
Expand Down Expand Up @@ -103,12 +111,6 @@ public interface INatsMsg<T>
/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="Subject">The destination subject to publish to.</param>
/// <param name="ReplyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="Size">Message size in bytes.</param>
/// <param name="Headers">Pass additional information using name-value pairs.</param>
/// <param name="Data">Serializable data object.</param>
/// <param name="Connection">NATS connection this message is associated to.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
Expand All @@ -119,20 +121,108 @@ public interface INatsMsg<T>
/// </code>
/// </para>
/// </remarks>
public readonly record struct NatsMsg<T>(
string Subject,
string? ReplyTo,
int Size,
NatsHeaders? Headers,
T? Data,
INatsConnection? Connection) : INatsMsg<T>
public readonly record struct NatsMsg<T> : INatsMsg<T>
{
private readonly uint _flagsAndSize;

/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="replyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="size">Message size in bytes.</param>
/// <param name="headers">Pass additional information using name-value pairs.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="connection">NATS connection this message is associated to.</param>
/// <param name="flags">Message flags to indicate no responders and empty payloads.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / test (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / test (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / test (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / test (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / test (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / test (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Linux (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (main)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (latest)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name

Check warning on line 138 in src/NATS.Client.Core/NatsMsg.cs

View workflow job for this annotation

GitHub Actions / Windows (v2.9)

XML comment has a typeparam tag for 'T', but there is no type parameter by that name
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
/// <para>
/// Message size is calculated using the same method NATS server uses:
/// <code lang="C#">
/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length;
/// </code>
/// </para>
/// </remarks>
public NatsMsg(
string subject,
string? replyTo,
int size,
NatsHeaders? headers,
T? data,
INatsConnection? connection,
NatsMsgFlags flags = default)
{
Subject = subject;
ReplyTo = replyTo;
Size = size;
Flags = flags;
Headers = headers;
Data = data;
Connection = connection;
}

/// <inheritdoc />
public NatsException? Error => Headers?.Error;

/// <summary>The destination subject to publish to.</summary>
public string Subject { get; init; }

/// <summary>The reply subject that subscribers can use to send a response back to the publisher/requester.</summary>
public string? ReplyTo { get; init; }

/// <summary>Message size in bytes.</summary>
public int Size
{
// Extract the lower 30 bits
get => (int)(_flagsAndSize & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number
init
{
// Mask the input value to fit within 30 bits (clear upper bits)
var numberPart = (uint)(value & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number value
// Preserve the flags, update the number
_flagsAndSize = (_flagsAndSize & 0xC0000000) | numberPart;
}
}

public NatsMsgFlags Flags
{
// Extract the two leftmost bits (31st and 30th bit)
// Mask with 0b11 to get two bits
get => (NatsMsgFlags)((_flagsAndSize >> 30) & 0b11);

init
{
// Clear the current flag bits (set to 0) and then set the new flag value
var flagsPart = (uint)value << 30;
_flagsAndSize = (_flagsAndSize & 0x3FFFFFFF) | flagsPart;
}
}

/// <summary>Pass additional information using name-value pairs.</summary>
public NatsHeaders? Headers { get; init; }

/// <summary>Serializable data object.</summary>
public T? Data { get; init; }

/// <summary>NATS connection this message is associated to.</summary>
public INatsConnection? Connection { get; init; }

public bool IsEmpty => (Flags & NatsMsgFlags.Empty) == NatsMsgFlags.Empty;

public bool HasNoResponders => (Flags & NatsMsgFlags.NoResponders) == NatsMsgFlags.NoResponders;

/// <inheritdoc />
public void EnsureSuccess()
{
if (HasNoResponders)
throw new NatsNoRespondersException();

if (Error != null)
throw Error;
}
Expand Down Expand Up @@ -197,6 +287,17 @@ public ValueTask ReplyAsync<TReply>(NatsMsg<TReply> msg, INatsSerialize<TReply>?
return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken);
}

public void Deconstruct(out string subject, out string? replyTo, out int size, out NatsHeaders? headers, out T? data, out INatsConnection? connection, out NatsMsgFlags flags)
{
subject = Subject;
replyTo = ReplyTo;
size = Size;
headers = Headers;
data = Data;
connection = Connection;
flags = Flags;
}

internal static NatsMsg<T> Build(
string subject,
string? replyTo,
Expand All @@ -207,22 +308,35 @@ internal static NatsMsg<T> Build(
INatsDeserialize<T> serializer)
{
NatsHeaders? headers = null;
var flags = NatsMsgFlags.None;

if (headersBuffer != null)
if (payloadBuffer.Length == 0)
{
headers = new NatsHeaders();
flags |= NatsMsgFlags.Empty;
}

try
if (headersBuffer != null)
{
if (NatsSubBase.IsHeader503(headersBuffer))
{
// Parsing can also throw an exception.
if (!headerParser.ParseHeaders(new SequenceReader<byte>(headersBuffer.Value), headers))
{
throw new NatsException("Error parsing headers");
}
flags |= NatsMsgFlags.NoResponders;
}
catch (Exception e)
else
{
headers.Error ??= new NatsHeaderParseException(headersBuffer.Value.ToArray(), e);
headers = new NatsHeaders();

try
{
// Parsing can also throw an exception.
if (!headerParser.ParseHeaders(new SequenceReader<byte>(headersBuffer.Value), headers))
{
throw new NatsException("Error parsing headers");
}
}
catch (Exception e)
{
headers.Error ??= new NatsHeaderParseException(headersBuffer.Value.ToArray(), e);
}
}
}

Expand Down Expand Up @@ -277,7 +391,7 @@ internal static NatsMsg<T> Build(
}
}

return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection, flags);
}

[MemberNotNull(nameof(Connection))]
Expand Down
6 changes: 5 additions & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
{
switch (Opts)
{
case { ThrowIfNoResponders: true } when headersBuffer is { Length: >= 12 } && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence):
case { ThrowIfNoResponders: true } when IsHeader503(headersBuffer):
SetException(new NatsNoRespondersException());
return;
case { StopOnEmptyMsg: true }:
Expand Down Expand Up @@ -311,6 +311,10 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
}
}

internal static bool IsHeader503(ReadOnlySequence<byte>? headersBuffer) =>
headersBuffer is { Length: >= 12 }
&& headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence);

internal void ClearException() => Interlocked.Exchange(ref _exception, null);

/// <summary>
Expand Down
27 changes: 27 additions & 0 deletions tests/NATS.Client.CoreUnit.Tests/NatsMsgTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Runtime.CompilerServices;

namespace NATS.Client.CoreUnit.Tests;

public class NatsMsgTests
{
[Theory]
[InlineData(42, NatsMsgFlags.None, false, false)]
[InlineData(42, NatsMsgFlags.Empty, true, false)]
[InlineData(42, NatsMsgFlags.NoResponders, false, true)]
[InlineData(42, NatsMsgFlags.Empty | NatsMsgFlags.NoResponders, true, true)]
[InlineData(1024 * 1024 * 128, NatsMsgFlags.Empty, true, false)]
public void Size_and_flags(int size, NatsMsgFlags flags, bool isEmpty, bool hasNoResponders)
{
var msg = new NatsMsg<string> { Size = size, Flags = flags };
Assert.Equal(size, msg.Size);
Assert.Equal(flags, msg.Flags);
Assert.Equal(isEmpty, msg.IsEmpty);
Assert.Equal(hasNoResponders, msg.HasNoResponders);
}

[Fact]
public void Check_struct_size()
{
Assert.Equal(48, Unsafe.SizeOf<NatsMsg<string>>());
}
}

0 comments on commit 4b231fc

Please sign in to comment.