diff --git a/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js b/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js
index 26caaee30abe7..1675b06f73056 100644
--- a/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js
+++ b/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js
@@ -233,20 +233,22 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
_renderCommandButtons(service, method){
if(this._streamsMap.size >=0){
- if(method.type == 'UNARY'){
- return html` this._clear(service.name, method)} @mouseup=${() => this._default(service.name, method)}>Reset
+ if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING'){
+ return html` this._default(service.name, method)}>Reset
this._test(service, method)}>Send`;
}else if(this._isRunning(service.name, method)){
- return html`
this._test(service, method)}>Cancel stream
-
`;
+ return html` this._default(service.name, method)}>Reset
+ this._test(service, method)}>Send
+ this._disconnect(service, method)}>Disconnect
+ `;
}else {
- return html` this._test(service, method)}>Start stream`;
+ return html` this._test(service, method)}>Send`;
}
}
}
_keypress(e, service, method){
- if(method.type == 'UNARY' || !this._isRunning(service.name, method)){
+ if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING' || !this._isRunning(service.name, method)){
if ((e.keyCode == 10 || e.keyCode == 13) && e.ctrlKey){ // ctlr-enter
this._test(service, method);
}
@@ -268,46 +270,75 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
}
_default(serviceName, method){
+ let requestTextArea = this._requestTextArea(serviceName, method);
+ requestTextArea.content = '';
let pv = JSON.parse(method.prototype);
- this._requestTextArea(serviceName, method).populatePrettyJson(JSON.stringify(pv));
+ let prettyJson = JSON.stringify(pv, null, 2);
+ requestTextArea.populatePrettyJson(prettyJson);
}
_test(service, method){
- let textArea = this._requestTextArea(service.name, method);
- let content = textArea.getAttribute('value');
+ let requestTextArea = this._requestTextArea(service.name, method);
+ let content = requestTextArea.getAttribute('value');
+ let id = this._id(service.name, method);
+ let responseTextArea = this._responseTextArea(service.name, method);
if(method.type == 'UNARY'){
this.jsonRpc.testService({
+ id: id,
serviceName: service.name,
methodName: method.bareMethodName,
methodType: method.type,
content: content
}).then(jsonRpcResponse => {
- const jsonObject = JSON.parse(jsonRpcResponse.result);
- const prettyJson = JSON.stringify(jsonObject, null, 2);
- this._responseTextArea(service.name, method).populatePrettyJson(prettyJson);
+ this._responseTextArea(service.name, method).populatePrettyJson(this._prettyJson(jsonRpcResponse.result));
});
}else{
- let id = this._id(service.name, method);
if(this._isRunning(service.name, method)){
- this._streamsMap.get(id).cancel();
- this._streamsMap.delete(id);
- this._clear(service.name, method);
- this._default(service.name, method);
+ this.jsonRpc.streamService({
+ id: id,
+ serviceName: service.name,
+ methodName: method.bareMethodName,
+ isRunning: true,
+ content: content
+ });
+ // this._streamsMap.get(id).cancel();
+ // this._streamsMap.delete(id);
+ // this._clear(service.name, method);
+ // this._default(service.name, method);
}else{
+ // starting a new stream, clear the response area
+ responseTextArea.content = null;
let cancelable = this.jsonRpc.streamService({
+ id: id,
serviceName: service.name,
methodName: method.bareMethodName,
- methodType: method.type,
+ isRunning: false,
content: content
}).onNext(jsonRpcResponse => {
- this._responseTextArea(service.name, method).populatePrettyJson(jsonRpcResponse.result);
+ if (responseTextArea.content == null) {
+ responseTextArea.populatePrettyJson(this._prettyJson(jsonRpcResponse.result));
+ } else {
+ responseTextArea.populatePrettyJson(responseTextArea.content + '\n' + this._prettyJson(jsonRpcResponse.result));
+ }
});
- this._streamsMap.set(id, cancelable);
+ if (method.type == 'BIDI_STREAMING' || method.type == 'CLIENT_STREAMING') {
+ this._streamsMap.set(id, cancelable);
+ }
}
this._testerButtons = this._renderCommandButtons(service, method);
this._forceUpdate();
}
}
+
+ _disconnect(service, method){
+ let id = this._id(service.name, method);
+ this.jsonRpc.disconnectService({
+ id: id,
+ });
+ this._streamsMap.delete(id);
+ this._testerButtons = this._renderCommandButtons(service, method);
+ this._forceUpdate();
+ }
_forceUpdate(){
if(this._detailsOpenedItem.length > 0){
@@ -332,5 +363,9 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
_responseId(serviceName, method){
return serviceName + '/' + method.bareMethodName + '_response';
}
+
+ _prettyJson(content){
+ return JSON.stringify(JSON.parse(content), null, 2);
+ }
}
customElements.define('qwc-grpc-services', QwcGrpcServices);
\ No newline at end of file
diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java
index 78835d4fd51fb..7d367e3d43710 100644
--- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java
+++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/devui/GrpcJsonRPCService.java
@@ -18,6 +18,7 @@
import com.google.protobuf.util.JsonFormat;
import io.grpc.Channel;
+import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.ServiceDescriptor;
import io.grpc.netty.NettyChannelBuilder;
@@ -45,6 +46,7 @@ public class GrpcJsonRPCService {
private static final Logger LOG = Logger.getLogger(GrpcJsonRPCService.class);
private Map grpcServiceClassInfos;
+ private Map> callsInProgress;
@Inject
HttpConfiguration httpConfiguration;
@@ -72,6 +74,7 @@ public void init() {
this.ssl = isTLSConfigured(httpConfiguration.ssl.certificate);
}
this.grpcServiceClassInfos = getGrpcServiceClassInfos();
+ this.callsInProgress = new HashMap<>();
}
private boolean isTLSConfigured(CertificateConfig certificate) {
@@ -107,25 +110,27 @@ public JsonArray getServices() {
return services;
}
- public Uni testService(String serviceName, String methodName, String methodType, String content) {
+ public Uni testService(String id, String serviceName, String methodName, String content) {
try {
- return streamService(serviceName, methodName, methodType, content).toUni();
+ return streamService(id, serviceName, methodName, false, content).toUni();
} catch (Throwable t) {
return Uni.createFrom().item(error(t.getMessage()).encodePrettily());
}
}
- public Multi streamService(String serviceName, String methodName, String methodType, String content)
+ public Multi streamService(String id, String serviceName, String methodName, boolean isRunning,
+ String content)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException {
if (content == null) {
- return Multi.createFrom().item(error("Invalid messsge").encodePrettily());
+ return Multi.createFrom().item(error("Invalid message").encodePrettily());
}
BroadcastProcessor streamEvent = BroadcastProcessor.create();
GrpcServiceClassInfo info = this.grpcServiceClassInfos.get(serviceName);
- Object grpcStub = createStub(info.grpcServiceClass, host, port);
+ ManagedChannel channel = getChannel(host, port);
+ Object grpcStub = createStub(info.grpcServiceClass, channel);
ServiceDescriptor serviceDescriptor = info.serviceDescriptor;
@@ -134,20 +139,50 @@ public Multi streamService(String serviceName, String methodName, String
MethodDescriptor.PrototypeMarshaller> protoMarshaller = (MethodDescriptor.PrototypeMarshaller>) requestMarshaller;
Class> requestType = protoMarshaller.getMessagePrototype().getClass();
+ Message message = createMessage(content, requestType);
+
+ if (isRunning) {
+ // we are already connected with this gRPC endpoint, just send the message
+ callsInProgress.get(id).onNext(message);
+ } else {
+ // Invoke the stub method and format the response as JSON
+ StreamObserver> responseObserver = new TestObserver<>(streamEvent);
+ StreamObserver incomingStream;
+
+ final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName());
+
+ if (stubMethod.getParameterCount() == 1 && stubMethod.getReturnType() == StreamObserver.class) {
+ // returned StreamObserver consumes incoming messages
+ //noinspection unchecked
+ incomingStream = (StreamObserver) stubMethod.invoke(grpcStub, responseObserver);
+ callsInProgress.put(id, incomingStream);
+ // will be streamed continuously
+ incomingStream.onNext(message);
+ } else {
+ // incoming message should be passed as the first parameter of the invocation
+ stubMethod.invoke(grpcStub, message, responseObserver);
+ }
+ }
+
+ channel.shutdown();
+ return streamEvent;
+ }
+
+ private static Message createMessage(String content, Class> requestType)
+ throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException {
// Create a new builder for the request message, e.g. HelloRequest.newBuilder()
Method newBuilderMethod = requestType.getDeclaredMethod("newBuilder");
Message.Builder builder = (Message.Builder) newBuilderMethod.invoke(null);
// Use the test data to build the request object
JsonFormat.parser().merge(content, builder);
- Message message = builder.build();
-
- StreamObserver> responseObserver = new TestObserver