Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added subject transforms to stream source config #134

Merged
merged 9 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/perf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ jobs:
fail-fast: false
matrix:
config:
- branch: dev
- branch: release/v2.9.23
- branch: latest
- branch: main
runs-on: ubuntu-latest
env:
Expand All @@ -23,7 +24,9 @@ jobs:
steps:
- name: Install nats
run: |
rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
# latest 0.1.1 doesn't have binaries
# rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
rel=0.1.0
wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip
unzip nats-$rel-linux-amd64.zip
sudo mv nats-$rel-linux-amd64/nats /usr/local/bin
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ jobs:
fail-fast: false
matrix:
config:
- branch: dev
- branch: release/v2.9.23
- branch: latest
- branch: main
runs-on: ubuntu-latest
env:
Expand Down Expand Up @@ -55,7 +56,8 @@ jobs:
fail-fast: false
matrix:
config:
- branch: dev
- branch: release/v2.9.23
- branch: latest
- branch: main
runs-on: windows-latest
env:
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.JetStream/Models/StreamSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public record StreamSource
public string FilterSubject { get; set; } = default!;

/// <summary>
/// Map matching subjects according to this transform destination
/// Subject transforms to apply to matching messages
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("subject_transform_dest")]
[System.Text.Json.Serialization.JsonPropertyName("subject_transforms")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public string SubjectTransformDest { get; set; } = default!;
public System.Collections.Generic.ICollection<SubjectTransform> SubjectTransforms { get; set; } = default!;

[System.Text.Json.Serialization.JsonPropertyName("external")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
Expand Down
4 changes: 3 additions & 1 deletion tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ public class RequestReplyTest
[Fact]
public async Task Simple_request_reply_test()
{
await using var server = NatsServer.Start();
// Trace to hunt flapper!
await using var server = NatsServer.StartWithTrace(_output);

await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
Expand Down
21 changes: 18 additions & 3 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ await Retry.Until(
public async Task Consume_idle_heartbeat_test()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var server = NatsServer.StartJS();
await using var server = NatsServer.StartJSWithTrace(_output);

var (nats, proxy) = server.CreateProxiedClientConnection();

Expand Down Expand Up @@ -120,14 +120,29 @@ public async Task Consume_idle_heartbeat_test()

await Retry.Until(
"all pull requests are received",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) == 2);
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) >= 2);

var msgNextRequests = proxy
.ClientFrames
.Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1"))
.ToList();

Assert.Equal(2, msgNextRequests.Count);
// In some cases we are receiving more than two requests which
// is possible if the tests are running in a slow container and taking
// more than the timeout? Looking at the test and the code I can't make
// sense of it, really, but I'm going to assume it's fine to receive 3 pull
// requests as well as 2 since test failure reported 3 and failed once.
if (msgNextRequests.Count > 2)
{
_output.WriteLine($"Pull request count more than expected: {msgNextRequests.Count}");
foreach (var frame in msgNextRequests)
{
_output.WriteLine($"PULL REQUEST: {frame}");
}
}

// Still fail and check traces if it happens again
Assert.True(msgNextRequests.Count is 2);

// Pull requests
foreach (var frame in msgNextRequests)
Expand Down
16 changes: 15 additions & 1 deletion tests/NATS.Client.TestUtilities/NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ public int ConnectionPort

public static NatsServer StartJS() => StartJS(new NullOutputHelper(), TransportType.Tcp);

public static NatsServer StartJSWithTrace(ITestOutputHelper outputHelper) => Start(
outputHelper: outputHelper,
opts: new NatsServerOptsBuilder()
.UseTransport(TransportType.Tcp)
.Trace()
.UseJetStream()
.Build());

public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType transportType) => Start(
outputHelper: outputHelper,
opts: new NatsServerOptsBuilder()
Expand All @@ -151,7 +159,13 @@ public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType t

public static NatsServer Start() => Start(new NullOutputHelper(), TransportType.Tcp);

public static NatsServer Start(ITestOutputHelper outputHelper) => Start(outputHelper, TransportType.Tcp);
public static NatsServer StartWithTrace(ITestOutputHelper outputHelper)
=> Start(
outputHelper,
new NatsServerOptsBuilder()
.Trace()
.UseTransport(TransportType.Tcp)
.Build());

public static NatsServer Start(ITestOutputHelper outputHelper, TransportType transportType) =>
Start(outputHelper, new NatsServerOptsBuilder().UseTransport(transportType).Build());
Expand Down
Loading