From 50f22214daaf16c3f6ef34967507326293f143c2 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 27 Sep 2023 22:49:07 +0100 Subject: [PATCH] Added subject transforms to stream source config (#134) * Added subject transforms to stream source config * Fixed nats-cli download for tests * binaries.nats.dev isn't working for main * binaries.nats.dev should be working for main * Add release/v2.9.23 to test matrix * Add latest to test matrix * Fixed potential test flapper * Chasing test flapper * Chasing test flapper --- .github/workflows/perf.yml | 7 +++++-- .github/workflows/test.yml | 6 ++++-- .../Models/StreamSource.cs | 6 +++--- .../RequestReplyTest.cs | 4 +++- .../ConsumerConsumeTest.cs | 21 ++++++++++++++++--- tests/NATS.Client.TestUtilities/NatsServer.cs | 16 +++++++++++++- 6 files changed, 48 insertions(+), 12 deletions(-) diff --git a/.github/workflows/perf.yml b/.github/workflows/perf.yml index e4c4c7413..79ac4aaf8 100644 --- a/.github/workflows/perf.yml +++ b/.github/workflows/perf.yml @@ -13,7 +13,8 @@ jobs: fail-fast: false matrix: config: - - branch: dev + - branch: release/v2.9.23 + - branch: latest - branch: main runs-on: ubuntu-latest env: @@ -23,7 +24,9 @@ jobs: steps: - name: Install nats run: | - rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//) + # latest 0.1.1 doesn't have binaries + # rel=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//) + rel=0.1.0 wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip unzip nats-$rel-linux-amd64.zip sudo mv nats-$rel-linux-amd64/nats /usr/local/bin diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cee691711..eea223ad3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,8 @@ jobs: fail-fast: false matrix: config: - - branch: dev + - branch: release/v2.9.23 + - branch: latest - branch: main runs-on: ubuntu-latest env: @@ -55,7 +56,8 @@ jobs: fail-fast: false matrix: config: - - branch: dev + - branch: release/v2.9.23 + - branch: latest - branch: main runs-on: windows-latest env: diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 5c84a38da..5c19e7f4f 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -39,11 +39,11 @@ public record StreamSource public string FilterSubject { get; set; } = default!; /// - /// Map matching subjects according to this transform destination + /// Subject transforms to apply to matching messages /// - [System.Text.Json.Serialization.JsonPropertyName("subject_transform_dest")] + [System.Text.Json.Serialization.JsonPropertyName("subject_transforms")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] - public string SubjectTransformDest { get; set; } = default!; + public System.Collections.Generic.ICollection SubjectTransforms { get; set; } = default!; [System.Text.Json.Serialization.JsonPropertyName("external")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index 8d0b081ae..699c7efc1 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -12,7 +12,9 @@ public class RequestReplyTest [Fact] public async Task Simple_request_reply_test() { - await using var server = NatsServer.Start(); + // Trace to hunt flapper! + await using var server = NatsServer.StartWithTrace(_output); + await using var nats = server.CreateClientConnection(); var sub = await nats.SubscribeAsync("foo"); diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index 0dec6922d..97fcf9106 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -79,7 +79,7 @@ await Retry.Until( public async Task Consume_idle_heartbeat_test() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await using var server = NatsServer.StartJS(); + await using var server = NatsServer.StartJSWithTrace(_output); var (nats, proxy) = server.CreateProxiedClientConnection(); @@ -120,14 +120,29 @@ public async Task Consume_idle_heartbeat_test() await Retry.Until( "all pull requests are received", - () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) == 2); + () => proxy.ClientFrames.Count(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) >= 2); var msgNextRequests = proxy .ClientFrames .Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) .ToList(); - Assert.Equal(2, msgNextRequests.Count); + // In some cases we are receiving more than two requests which + // is possible if the tests are running in a slow container and taking + // more than the timeout? Looking at the test and the code I can't make + // sense of it, really, but I'm going to assume it's fine to receive 3 pull + // requests as well as 2 since test failure reported 3 and failed once. + if (msgNextRequests.Count > 2) + { + _output.WriteLine($"Pull request count more than expected: {msgNextRequests.Count}"); + foreach (var frame in msgNextRequests) + { + _output.WriteLine($"PULL REQUEST: {frame}"); + } + } + + // Still fail and check traces if it happens again + Assert.True(msgNextRequests.Count is 2); // Pull requests foreach (var frame in msgNextRequests) diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index 40c43f96c..1805bd67b 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -142,6 +142,14 @@ public int ConnectionPort public static NatsServer StartJS() => StartJS(new NullOutputHelper(), TransportType.Tcp); + public static NatsServer StartJSWithTrace(ITestOutputHelper outputHelper) => Start( + outputHelper: outputHelper, + opts: new NatsServerOptsBuilder() + .UseTransport(TransportType.Tcp) + .Trace() + .UseJetStream() + .Build()); + public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType transportType) => Start( outputHelper: outputHelper, opts: new NatsServerOptsBuilder() @@ -151,7 +159,13 @@ public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType t public static NatsServer Start() => Start(new NullOutputHelper(), TransportType.Tcp); - public static NatsServer Start(ITestOutputHelper outputHelper) => Start(outputHelper, TransportType.Tcp); + public static NatsServer StartWithTrace(ITestOutputHelper outputHelper) + => Start( + outputHelper, + new NatsServerOptsBuilder() + .Trace() + .UseTransport(TransportType.Tcp) + .Build()); public static NatsServer Start(ITestOutputHelper outputHelper, TransportType transportType) => Start(outputHelper, new NatsServerOptsBuilder().UseTransport(transportType).Build());