Skip to content

Commit

Permalink
Add issue quarkusio#31422 reproducer.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Mar 10, 2023
1 parent cf2acb1 commit 13ce8e3
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.quarkus.grpc.example.streaming;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.MutinyStreamingGrpc;
import io.grpc.examples.streaming.StringReply;
import io.grpc.examples.streaming.StringRequest;
import io.quarkus.grpc.GrpcService;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -36,4 +39,43 @@ public Multi<Item> pipe(Multi<Item> request) {
.onItem().scan(() -> 0L, Long::sum)
.onItem().transform(l -> Item.newBuilder().setValue(Long.toString(l)).build());
}

@Override
public Uni<StringReply> quickStringStream(Multi<StringRequest> request) {
return request
.call(() -> {
throw new RuntimeException("Any error");
})
.map(x -> {
return StringReply.newBuilder()
.setMessage(x.toString())
.build();
})
.collect().asList()
.replaceWith(StringReply.newBuilder()
.setMessage("DONE")
.build())
.onFailure()
.invoke(th -> System.err.println("Quick: " + th.getMessage()));
}

@Override
public Uni<StringReply> midStringStream(Multi<StringRequest> request) {
AtomicInteger atomicInteger = new AtomicInteger(0);
return request
.map(x -> {
if (atomicInteger.getAndIncrement() == 5) {
throw new RuntimeException("We reached 5, error here");
}
return StringReply.newBuilder()
.setMessage(x.toString())
.build();
})
.collect().asList()
.replaceWith(StringReply.newBuilder()
.setMessage("DONE")
.build())
.onFailure()
.invoke(th -> System.err.println("Mid: " + th.getMessage()));
}
}
12 changes: 11 additions & 1 deletion integration-tests/grpc-streaming/src/main/proto/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@ service Streaming {
rpc Source(Empty) returns (stream Item) {}
rpc Sink(stream Item) returns (Empty) {}
rpc Pipe(stream Item) returns (stream Item) {}
rpc QuickStringStream (stream StringRequest) returns (StringReply) {}
rpc MidStringStream (stream StringRequest) returns (StringReply) {}
}

message Item {
string value = 1;
}

message Empty {
}
}

message StringRequest {
string anyValue = 1;
}

message StringReply {
string message = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@ quarkus.grpc.clients.streaming.port=9001
%vertx.quarkus.grpc.clients.streaming.port=8081
%vertx.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true
%vertx.quarkus.grpc.server.use-separate-server=false

%n2o.quarkus.grpc.server.use-separate-server=true
%o2n.quarkus.grpc.server.use-separate-server=false

%n2o.quarkus.grpc.clients.streaming.port=9001
%n2o.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=true

%o2n.quarkus.grpc.clients.streaming.port=8081
%o2n.quarkus.grpc.clients.streaming.use-quarkus-grpc-client=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class LongStreamTest extends LongStreamTestBase {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.quarkus.grpc.example.streaming;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.StatusRuntimeException;
import io.grpc.examples.streaming.Streaming;
import io.grpc.examples.streaming.StringReply;
import io.grpc.examples.streaming.StringRequest;
import io.quarkus.grpc.GrpcClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;

@SuppressWarnings("NewClassNamingConvention")
public class LongStreamTestBase {
private final Logger log = LoggerFactory.getLogger(getClass());

@GrpcClient("streaming")
Streaming streamSvc;

@Test
public void testQuickFailure() {
Multi<StringRequest> multi = Multi.createFrom().range(1, 10)
// delaying stream to make it a bit longer
.call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS)))
.map(x -> StringRequest.newBuilder()
.setAnyValue(x.toString())
.build());
// .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue()));

UniAssertSubscriber<StringReply> subscriber = streamSvc.quickStringStream(multi)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber
.awaitFailure()
.assertFailedWith(StatusRuntimeException.class);
}

@Test
public void testMidFailure() {
Multi<StringRequest> multi = Multi.createFrom().range(1, 10)
// delaying stream to make it a bit longer
.call(() -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.of(1000, ChronoUnit.NANOS)))
.map(x -> StringRequest.newBuilder()
.setAnyValue(x.toString())
.build());
// .invoke(x -> log.info("Stream piece number is: " + x.getAnyValue()));

UniAssertSubscriber<StringReply> subscriber = streamSvc.midStringStream(multi)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber
.awaitFailure()
.assertFailedWith(StatusRuntimeException.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.grpc.test.utils.N2OGRPCTestProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@TestProfile(N2OGRPCTestProfile.class)
public class N2OLongStreamTest extends LongStreamTestBase {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.grpc.test.utils.O2NGRPCTestProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@TestProfile(O2NGRPCTestProfile.class)
public class O2NLongStreamTest extends LongStreamTestBase {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.quarkus.grpc.example.streaming;

import io.quarkus.grpc.test.utils.VertxGRPCTestProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@TestProfile(VertxGRPCTestProfile.class)
public class VertxLongStreamTest extends LongStreamTestBase {
}

0 comments on commit 13ce8e3

Please sign in to comment.