From 8e5f229cca5742beb5a656b51a52cc6f8d9b2880 Mon Sep 17 00:00:00 2001 From: Ales Justin Date: Thu, 9 Mar 2023 10:34:16 +0100 Subject: [PATCH] Add issue #31422 reproducer. --- .../example/streaming/StreamingService.java | 43 +++++++++++++ .../src/main/proto/streaming.proto | 12 +++- .../src/main/resources/application.properties | 9 +++ .../example/streaming/LongStreamTest.java | 7 +++ .../example/streaming/LongStreamTestBase.java | 62 +++++++++++++++++++ .../example/streaming/N2OLongStreamTest.java | 10 +++ .../example/streaming/O2NLongStreamTest.java | 10 +++ .../streaming/VertxLongStreamTest.java | 10 +++ 8 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java create mode 100644 integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java create mode 100644 integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java create mode 100644 integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java create mode 100644 integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java diff --git a/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java b/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java index 35612cc5d9e4d1..db3d9226ff96dc 100644 --- a/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java +++ b/integration-tests/grpc-streaming/src/main/java/io/quarkus/grpc/example/streaming/StreamingService.java @@ -1,10 +1,13 @@ package io.quarkus.grpc.example.streaming; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; import io.grpc.examples.streaming.Empty; import io.grpc.examples.streaming.Item; import io.grpc.examples.streaming.MutinyStreamingGrpc; +import io.grpc.examples.streaming.StringReply; +import io.grpc.examples.streaming.StringRequest; import io.quarkus.grpc.GrpcService; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -36,4 +39,44 @@ public Multi pipe(Multi request) { .onItem().scan(() -> 0L, Long::sum) .onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build()); } + + @Override + public Uni quickStringStream(Multi request) { + return request + .call(() -> { + throw new RuntimeException("Any error"); + }) + .map(x -> { + return StringReply.newBuilder() + .setMessage(x.toString()) + .build(); + }) + .collect().asList() + .replaceWith(StringReply.newBuilder() + .setMessage("DONE") + .build()) + .onFailure() + .invoke(th -> System.err.println("Quick: " + th.getMessage())); + } + + @Override + public Uni midStringStream(Multi request) { + AtomicInteger atomicInteger = new AtomicInteger(0); + return request + // .call(() -> Uni.createFrom().failure(new RuntimeException("Any error"))) + .map(x -> { + if (atomicInteger.getAndIncrement() == 5) { + throw new RuntimeException("We reached 5, error here"); + } + return StringReply.newBuilder() + .setMessage(x.toString()) + .build(); + }) + .collect().asList() + .replaceWith(StringReply.newBuilder() + .setMessage("DONE") + .build()) + .onFailure() + .invoke(th -> System.err.println("Mid: " + th.getMessage())); + } } diff --git a/integration-tests/grpc-streaming/src/main/proto/streaming.proto b/integration-tests/grpc-streaming/src/main/proto/streaming.proto index 43b857c8dac4a6..413b5fc9f53ff2 100644 --- a/integration-tests/grpc-streaming/src/main/proto/streaming.proto +++ b/integration-tests/grpc-streaming/src/main/proto/streaming.proto @@ -12,6 +12,8 @@ service Streaming { rpc Source(Empty) returns (stream Item) {} rpc Sink(stream Item) returns (Empty) {} rpc Pipe(stream Item) returns (stream Item) {} + rpc QuickStringStream (stream StringRequest) returns (StringReply) {} + rpc MidStringStream (stream StringRequest) returns (StringReply) {} } message Item { @@ -19,4 +21,12 @@ message Item { } message Empty { -} \ No newline at end of file +} + +message StringRequest { + string anyValue = 1; +} + +message StringReply { + string message = 1; +} diff --git a/integration-tests/grpc-streaming/src/main/resources/application.properties b/integration-tests/grpc-streaming/src/main/resources/application.properties index 8405096c74a298..011cd24e1144da 100644 --- a/integration-tests/grpc-streaming/src/main/resources/application.properties +++ b/integration-tests/grpc-streaming/src/main/resources/application.properties @@ -4,3 +4,12 @@ quarkus.grpc.clients.streaming.port=9001 %vertx.quarkus.grpc.clients.streaming.port=8081 %vertx.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true %vertx.quarkus.grpc.server.use-separate-server=false + +%n2o.quarkus.grpc.server.use-separate-server=true +%o2n.quarkus.grpc.server.use-separate-server=false + +%n2o.quarkus.grpc.clients.streaming.port=9001 +%n2o.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true + +%o2n.quarkus.grpc.clients.streaming.port=8081 +%o2n.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=false diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java new file mode 100644 index 00000000000000..ba401722b2bb42 --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTest.java @@ -0,0 +1,7 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class LongStreamTest extends LongStreamTestBase { +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java new file mode 100644 index 00000000000000..69ccbfb4595eca --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/LongStreamTestBase.java @@ -0,0 +1,62 @@ +package io.quarkus.grpc.example.streaming; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.StatusRuntimeException; +import io.grpc.examples.streaming.Streaming; +import io.grpc.examples.streaming.StringReply; +import io.grpc.examples.streaming.StringRequest; +import io.quarkus.grpc.GrpcClient; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; + +@SuppressWarnings("NewClassNamingConvention") +public class LongStreamTestBase { + private final Logger log = LoggerFactory.getLogger(getClass()); + + @GrpcClient("streaming") + Streaming streamSvc; + + @Test + public void testQuickFailure() { + Multi multi = Multi.createFrom().range(1, 10) + // delaying stream to make it a bit longer + .call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS))) + .map(x -> StringRequest.newBuilder() + .setAnyValue(x.toString()) + .build()); + // .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue())); + + UniAssertSubscriber subscriber = streamSvc.quickStringStream(multi) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber + .awaitFailure() + .assertFailedWith(StatusRuntimeException.class); + } + + @Test + public void testMidFailure() { + Multi multi = Multi.createFrom().range(1, 10) + // delaying stream to make it a bit longer + .call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS))) + .map(x -> StringRequest.newBuilder() + .setAnyValue(x.toString()) + .build()); + // .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue())); + + UniAssertSubscriber subscriber = streamSvc.midStringStream(multi) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber + .awaitFailure() + .assertFailedWith(StatusRuntimeException.class); + } + +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java new file mode 100644 index 00000000000000..f7598d1727953f --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/N2OLongStreamTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.N2OGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(N2OGRPCTestProfile.class) +public class N2OLongStreamTest extends LongStreamTestBase { +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java new file mode 100644 index 00000000000000..9278fa5cfad07a --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/O2NLongStreamTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.O2NGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(O2NGRPCTestProfile.class) +public class O2NLongStreamTest extends LongStreamTestBase { +} diff --git a/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java new file mode 100644 index 00000000000000..c056a783c9a964 --- /dev/null +++ b/integration-tests/grpc-streaming/src/test/java/io/quarkus/grpc/example/streaming/VertxLongStreamTest.java @@ -0,0 +1,10 @@ +package io.quarkus.grpc.example.streaming; + +import io.quarkus.grpc.test.utils.VertxGRPCTestProfile; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; + +@QuarkusTest +@TestProfile(VertxGRPCTestProfile.class) +public class VertxLongStreamTest extends LongStreamTestBase { +}