Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Sep 9, 2024
1 parent 33dc6ed commit 11bc11c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public NatsJSConsume(

if (self.Connection.ConnectionState == NatsConnectionState.Open)
{
self.CheckPending("heartbeat-timeout");
self.Pull("heartbeat-timeout", self._maxMsgs, self._maxBytes);
self.ResetPending();
if (self._debug)
{
self._logger.LogDebug(
Expand Down
17 changes: 7 additions & 10 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public async Task Consume_msgs_test()
var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10 };
var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token);
var count = 0;
await using var cc = await consumer.ConsumeInternalAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, consumerOpts, cancellationToken: cts.Token);
await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token))
await foreach (var msg in consumer.ConsumeAsync(serializer: TestDataJsonSerializer<TestData>.Default, consumerOpts, cancellationToken: cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Assert.Equal(count, msg.Data!.Test);
Expand All @@ -92,7 +91,7 @@ public async Task Consume_msgs_test()

await Retry.Until(
reason: "received enough pulls",
condition: () => PullCount() > 5,
condition: () => PullCount() >= 4,
action: () =>
{
_output.WriteLine($"### PullCount:{PullCount()}");
Expand Down Expand Up @@ -215,12 +214,10 @@ public async Task Consume_reconnect_test()
// Not interested in management messages sent upto this point
await proxy.FlushFramesAsync(nats);

var cc = await consumer.ConsumeInternalAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, consumerOpts, cancellationToken: cts.Token);

var readerTask = Task.Run(async () =>
{
var count = 0;
await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token))
await foreach (var msg in consumer.ConsumeAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, consumerOpts, cancellationToken: cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
Assert.Equal(count, msg.Data!.Test);
Expand All @@ -230,6 +227,8 @@ public async Task Consume_reconnect_test()
if (count == 2)
break;
}

return count;
});

// Send a message before reconnect
Expand Down Expand Up @@ -258,11 +257,9 @@ await Retry.Until(
ack.EnsureSuccess();
}

await Retry.Until(
"acked",
() => proxy.ClientFrames.Any(f => f.Message.Contains("CONSUMER.MSG.NEXT")));
var count = await readerTask;
Assert.Equal(2, count);

await readerTask;
await nats.DisposeAsync();
}

Expand Down

0 comments on commit 11bc11c

Please sign in to comment.