Skip to content

Commit

Permalink
Support json wire format.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Aug 6, 2024
1 parent 3d62ec8 commit 2945fe4
Show file tree
Hide file tree
Showing 53 changed files with 1,409 additions and 346 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
Expand Down Expand Up @@ -173,6 +178,11 @@
</dependency>

<!-- Testing -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions vertx-grpc-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@
<artifactId>grpc-stub</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<!-- Self signed certificate generation -->
<dependency>
<groupId>org.bouncycastle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.Timer;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.streams.ReadStream;
import io.vertx.grpc.common.WireFormat;
import io.vertx.grpc.common.GrpcWriteStream;
import io.vertx.grpc.common.ServiceName;

Expand All @@ -45,6 +46,9 @@ public interface GrpcClientRequest<Req, Resp> extends GrpcWriteStream<Req> {
@Fluent
GrpcClientRequest<Req, Resp> encoding(String encoding);

@Override
GrpcClientRequest<Req, Resp> format(WireFormat format);

/**
* Set the full method name to call, it must follow the format {@code package-name + '.' + service-name + '/' + method-name}
* or an {@code IllegalArgumentException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Timer;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;

import java.util.EnumMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.grpc.client.GrpcClientRequest;
Expand Down Expand Up @@ -48,8 +49,9 @@ public class GrpcClientRequestImpl<Req, Resp> extends GrpcWriteStreamBase<GrpcCl

public GrpcClientRequestImpl(HttpClientRequest httpRequest,
boolean scheduleDeadline,
GrpcMessageEncoder<Req> messageEncoder, GrpcMessageDecoder<Resp> messageDecoder) {
super( ((PromiseInternal<?>)httpRequest.response()).context(), httpRequest, messageEncoder);
GrpcMessageEncoder<Req> messageEncoder,
GrpcMessageDecoder<Resp> messageDecoder) {
super( ((PromiseInternal<?>)httpRequest.response()).context(), "application/grpc", httpRequest, messageEncoder);
this.httpRequest = httpRequest;
this.scheduleDeadline = scheduleDeadline;
this.timeout = 0L;
Expand All @@ -58,9 +60,28 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
this.response = httpRequest
.response()
.compose(httpResponse -> {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, httpResponse, messageDecoder);
grpcResponse.init(this);
return Future.succeededFuture(grpcResponse);
String msg = null;
String statusHeader = httpResponse.getHeader("grpc-status");
GrpcStatus status = statusHeader != null ? GrpcStatus.valueOf(Integer.parseInt(statusHeader)) : null;
WireFormat format = null;
if (status == null) {
String contentType = httpResponse.getHeader(HttpHeaders.CONTENT_TYPE);
if (contentType != null) {
format = GrpcMediaType.parseContentType(contentType, "application/grpc");
}
if (contentType == null) {
msg = "HTTP response missing content-type header";
} else {
msg = "Invalid HTTP response content-type header";
}
}
if (format != null || status != null) {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, format, status, httpResponse, messageDecoder);
grpcResponse.init(this);
return Future.succeededFuture(grpcResponse);
}
httpResponse.request().reset(GrpcError.CANCELLED.http2ResetCode);
return context().failedFuture(msg);
}, err -> {
if (err instanceof StreamResetException) {
err = GrpcErrorException.create((StreamResetException) err);
Expand Down Expand Up @@ -95,12 +116,6 @@ public GrpcClientRequest<Req, Resp> methodName(String methodName) {
return this;
}

@Override public GrpcClientRequest<Req, Resp> encoding(String encoding) {
Objects.requireNonNull(encoding);
this.encoding = encoding;
return this;
}

@Override
public GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit) {
if (timeout < 0L) {
Expand Down Expand Up @@ -131,7 +146,7 @@ public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) {
}

@Override
protected void sendHeaders(MultiMap headers, boolean end) {
protected void sendHeaders(String contentType, MultiMap headers, boolean end) {
ServiceName serviceName = this.serviceName;
String methodName = this.methodName;
if (serviceName == null) {
Expand All @@ -150,7 +165,7 @@ protected void sendHeaders(MultiMap headers, boolean end) {
httpRequest.putHeader("grpc-timeout", timeoutHeader);
}
String uri = serviceName.pathOf(methodName);
httpRequest.putHeader("content-type", "application/grpc");
httpRequest.putHeader(HttpHeaders.CONTENT_TYPE, contentType);
if (encoding != null) {
httpRequest.putHeader("grpc-encoding", encoding);
}
Expand All @@ -172,8 +187,8 @@ protected void sendTrailers(MultiMap trailers) {
}

@Override
protected Future<Void> sendMessage(GrpcMessage message) {
return httpRequest.write(GrpcMessageImpl.encode(message));
protected Future<Void> sendMessage(Buffer message, boolean compressed) {
return httpRequest.write(GrpcMessageImpl.encode(message, compressed, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package io.vertx.grpc.client.impl;

import io.netty.handler.codec.http.QueryStringDecoder;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand All @@ -33,50 +32,36 @@ public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcCl
private final HttpClientResponse httpResponse;
private GrpcStatus status;
private String statusMessage;
private String encoding;

public GrpcClientResponseImpl(ContextInternal context,
GrpcClientRequestImpl<Req, Resp> request,
WireFormat format,
GrpcStatus status,
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), messageDecoder);
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), format, messageDecoder);
this.request = request;
this.encoding = httpResponse.headers().get("grpc-encoding");
this.httpResponse = httpResponse;

String responseStatus = httpResponse.getHeader("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
if (status != GrpcStatus.OK) {
String msg = httpResponse.getHeader("grpc-message");
if (msg != null) {
statusMessage = QueryStringDecoder.decodeComponent(msg, StandardCharsets.UTF_8);
}
}
}
this.status = status;
}

@Override
public MultiMap headers() {
return httpResponse.headers();
}

@Override
public String encoding() {
return encoding;
}

@Override
public MultiMap trailers() {
return httpResponse.trailers();
}

protected void handleEnd() {
request.cancelTimeout();
String responseStatus = httpResponse.getTrailer("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
if (status != GrpcStatus.OK) {
statusMessage = httpResponse.getTrailer("grpc-message");
if (status == null) {
String responseStatus = httpResponse.getTrailer("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
} else {
status = GrpcStatus.UNKNOWN;
}
}
super.handleEnd();
Expand All @@ -92,14 +77,20 @@ public GrpcStatus status() {

@Override
public String statusMessage() {
if (status != null && status != GrpcStatus.OK) {
String msg = httpResponse.getHeader("grpc-message");
if (msg != null) {
statusMessage = QueryStringDecoder.decodeComponent(msg, StandardCharsets.UTF_8);
}
}
return statusMessage;
}

@Override
public Future<Void> end() {
return super.end()
.compose(v -> {
if (status == GrpcStatus.OK) {
if (status() == GrpcStatus.OK) {
return Future.succeededFuture();
} else {
return Future.failedFuture("Invalid gRPC status " + status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.grpc.examples.helloworld.GreeterGrpc;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.StreamResetException;
Expand Down Expand Up @@ -167,6 +168,7 @@ private void testDecode(TestContext should, Async async, Buffer payload, Consume
req.endHandler(v -> {
HttpServerResponse resp = req.response();
resp.putHeader("grpc-encoding", "gzip");
resp.putHeader(HttpHeaders.CONTENT_TYPE, "application/grpc");
resp.putTrailer("grpc-status", "" + GrpcStatus.OK.code);
resp.write(Buffer.buffer()
.appendByte((byte)1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import io.grpc.examples.streaming.StreamingGrpc;
import io.grpc.stub.StreamObserver;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SelfSignedCertificate;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientOptions;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcErrorException;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.common.GrpcLocal;
import io.vertx.grpc.common.*;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -629,4 +629,74 @@ public void onCompleted() {
}));
}));
}

@Test
public void testMissingResponseStatusIsUnknown(TestContext should) throws Exception {

Async done = should.async();
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
req.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "application/grpc")
.end();
});
server.listen(port, "localhost")
.toCompletionStage()
.toCompletableFuture()
.get(20, TimeUnit.SECONDS);

client = GrpcClient.client(vertx);
client
.request(SocketAddress.inetSocketAddress(port, "localhost"), STREAMING_PIPE)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.end();
callRequest
.response()
.onComplete(should.asyncAssertSuccess(resp -> {
resp.endHandler(v -> {
should.assertEquals(GrpcStatus.UNKNOWN, resp.status());
done.complete();
});
}));
}));

done.awaitSuccess();
}

@Test
public void testJsonMessageFormat(TestContext should) throws Exception {

super.testJsonMessageFormat(should, "application/grpc+json");

JsonObject helloReply = new JsonObject().put("message", "Hello Julien");
JsonObject helloRequest = new JsonObject().put("name", "Julien");

Async done = should.async();

ServiceMethod<JsonObject, JsonObject> serviceMethod = ServiceMethod.client(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.JSON_OBJECT,
GrpcMessageDecoder.JSON_OBJECT);

client = GrpcClient.client(vertx);
client
.request(SocketAddress.inetSocketAddress(port, "localhost"), serviceMethod)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.end(helloRequest);
callRequest
.response()
.onComplete(should.asyncAssertSuccess(resp -> {
should.assertEquals(WireFormat.JSON, resp.format());
resp.handler(msg -> {
should.assertEquals(helloReply, msg);
});
resp.endHandler(v -> {
done.complete();
});
}));
}));

done.awaitSuccess();
}
}
Loading

0 comments on commit 2945fe4

Please sign in to comment.