Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support json wire format #108

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
Expand Down Expand Up @@ -173,6 +178,11 @@
</dependency>

<!-- Testing -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions vertx-grpc-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@
<artifactId>grpc-stub</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<!-- Self signed certificate generation -->
<dependency>
<groupId>org.bouncycastle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.Timer;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.streams.ReadStream;
import io.vertx.grpc.common.WireFormat;
import io.vertx.grpc.common.GrpcWriteStream;
import io.vertx.grpc.common.ServiceName;

Expand All @@ -45,6 +46,9 @@ public interface GrpcClientRequest<Req, Resp> extends GrpcWriteStream<Req> {
@Fluent
GrpcClientRequest<Req, Resp> encoding(String encoding);

@Override
GrpcClientRequest<Req, Resp> format(WireFormat format);

/**
* Set the full method name to call, it must follow the format {@code package-name + '.' + service-name + '/' + method-name}
* or an {@code IllegalArgumentException} is thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Timer;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;

import java.util.EnumMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.grpc.client.GrpcClientRequest;
Expand Down Expand Up @@ -48,8 +49,9 @@ public class GrpcClientRequestImpl<Req, Resp> extends GrpcWriteStreamBase<GrpcCl

public GrpcClientRequestImpl(HttpClientRequest httpRequest,
boolean scheduleDeadline,
GrpcMessageEncoder<Req> messageEncoder, GrpcMessageDecoder<Resp> messageDecoder) {
super( ((PromiseInternal<?>)httpRequest.response()).context(), httpRequest, messageEncoder);
GrpcMessageEncoder<Req> messageEncoder,
GrpcMessageDecoder<Resp> messageDecoder) {
super( ((PromiseInternal<?>)httpRequest.response()).context(), "application/grpc", httpRequest, messageEncoder);
this.httpRequest = httpRequest;
this.scheduleDeadline = scheduleDeadline;
this.timeout = 0L;
Expand All @@ -58,9 +60,28 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
this.response = httpRequest
.response()
.compose(httpResponse -> {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, httpResponse, messageDecoder);
grpcResponse.init(this);
return Future.succeededFuture(grpcResponse);
String msg = null;
String statusHeader = httpResponse.getHeader("grpc-status");
GrpcStatus status = statusHeader != null ? GrpcStatus.valueOf(Integer.parseInt(statusHeader)) : null;
WireFormat format = null;
if (status == null) {
String contentType = httpResponse.getHeader(HttpHeaders.CONTENT_TYPE);
if (contentType != null) {
format = GrpcMediaType.parseContentType(contentType, "application/grpc");
}
if (contentType == null) {
msg = "HTTP response missing content-type header";
} else {
msg = "Invalid HTTP response content-type header";
}
}
if (format != null || status != null) {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, format, status, httpResponse, messageDecoder);
grpcResponse.init(this);
return Future.succeededFuture(grpcResponse);
}
httpResponse.request().reset(GrpcError.CANCELLED.http2ResetCode);
return context().failedFuture(msg);
}, err -> {
if (err instanceof StreamResetException) {
err = GrpcErrorException.create((StreamResetException) err);
Expand Down Expand Up @@ -95,12 +116,6 @@ public GrpcClientRequest<Req, Resp> methodName(String methodName) {
return this;
}

@Override public GrpcClientRequest<Req, Resp> encoding(String encoding) {
Objects.requireNonNull(encoding);
this.encoding = encoding;
return this;
}

@Override
public GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit) {
if (timeout < 0L) {
Expand Down Expand Up @@ -131,7 +146,7 @@ public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) {
}

@Override
protected void sendHeaders(MultiMap headers, boolean end) {
protected void sendHeaders(String contentType, MultiMap headers, boolean end) {
ServiceName serviceName = this.serviceName;
String methodName = this.methodName;
if (serviceName == null) {
Expand All @@ -150,7 +165,7 @@ protected void sendHeaders(MultiMap headers, boolean end) {
httpRequest.putHeader("grpc-timeout", timeoutHeader);
}
String uri = serviceName.pathOf(methodName);
httpRequest.putHeader("content-type", "application/grpc");
httpRequest.putHeader(HttpHeaders.CONTENT_TYPE, contentType);
if (encoding != null) {
httpRequest.putHeader("grpc-encoding", encoding);
}
Expand All @@ -172,8 +187,8 @@ protected void sendTrailers(MultiMap trailers) {
}

@Override
protected Future<Void> sendMessage(GrpcMessage message) {
return httpRequest.write(GrpcMessageImpl.encode(message));
protected Future<Void> sendMessage(Buffer message, boolean compressed) {
return httpRequest.write(GrpcMessageImpl.encode(message, compressed, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package io.vertx.grpc.client.impl;

import io.netty.handler.codec.http.QueryStringDecoder;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand All @@ -33,50 +32,36 @@ public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcCl
private final HttpClientResponse httpResponse;
private GrpcStatus status;
private String statusMessage;
private String encoding;

public GrpcClientResponseImpl(ContextInternal context,
GrpcClientRequestImpl<Req, Resp> request,
WireFormat format,
GrpcStatus status,
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), messageDecoder);
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), format, messageDecoder);
this.request = request;
this.encoding = httpResponse.headers().get("grpc-encoding");
this.httpResponse = httpResponse;

String responseStatus = httpResponse.getHeader("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
if (status != GrpcStatus.OK) {
String msg = httpResponse.getHeader("grpc-message");
if (msg != null) {
statusMessage = QueryStringDecoder.decodeComponent(msg, StandardCharsets.UTF_8);
}
}
}
this.status = status;
}

@Override
public MultiMap headers() {
return httpResponse.headers();
}

@Override
public String encoding() {
return encoding;
}

@Override
public MultiMap trailers() {
return httpResponse.trailers();
}

protected void handleEnd() {
request.cancelTimeout();
String responseStatus = httpResponse.getTrailer("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
if (status != GrpcStatus.OK) {
statusMessage = httpResponse.getTrailer("grpc-message");
if (status == null) {
String responseStatus = httpResponse.getTrailer("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
} else {
status = GrpcStatus.UNKNOWN;
}
}
super.handleEnd();
Expand All @@ -92,14 +77,20 @@ public GrpcStatus status() {

@Override
public String statusMessage() {
if (status != null && status != GrpcStatus.OK) {
String msg = httpResponse.getHeader("grpc-message");
if (msg != null) {
statusMessage = QueryStringDecoder.decodeComponent(msg, StandardCharsets.UTF_8);
}
}
return statusMessage;
}

@Override
public Future<Void> end() {
return super.end()
.compose(v -> {
if (status == GrpcStatus.OK) {
if (status() == GrpcStatus.OK) {
return Future.succeededFuture();
} else {
return Future.failedFuture("Invalid gRPC status " + status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.grpc.examples.helloworld.GreeterGrpc;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.StreamResetException;
Expand Down Expand Up @@ -167,6 +168,7 @@ private void testDecode(TestContext should, Async async, Buffer payload, Consume
req.endHandler(v -> {
HttpServerResponse resp = req.response();
resp.putHeader("grpc-encoding", "gzip");
resp.putHeader(HttpHeaders.CONTENT_TYPE, "application/grpc");
resp.putTrailer("grpc-status", "" + GrpcStatus.OK.code);
resp.write(Buffer.buffer()
.appendByte((byte)1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import io.grpc.examples.streaming.StreamingGrpc;
import io.grpc.stub.StreamObserver;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SelfSignedCertificate;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientOptions;
import io.vertx.grpc.common.GrpcError;
import io.vertx.grpc.common.GrpcErrorException;
import io.vertx.grpc.common.GrpcStatus;
import io.vertx.grpc.common.GrpcLocal;
import io.vertx.grpc.common.*;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -629,4 +629,74 @@ public void onCompleted() {
}));
}));
}

@Test
public void testMissingResponseStatusIsUnknown(TestContext should) throws Exception {

Async done = should.async();
HttpServer server = vertx.createHttpServer();
server.requestHandler(req -> {
req.response()
.putHeader(HttpHeaders.CONTENT_TYPE, "application/grpc")
.end();
});
server.listen(port, "localhost")
.toCompletionStage()
.toCompletableFuture()
.get(20, TimeUnit.SECONDS);

client = GrpcClient.client(vertx);
client
.request(SocketAddress.inetSocketAddress(port, "localhost"), STREAMING_PIPE)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.end();
callRequest
.response()
.onComplete(should.asyncAssertSuccess(resp -> {
resp.endHandler(v -> {
should.assertEquals(GrpcStatus.UNKNOWN, resp.status());
done.complete();
});
}));
}));

done.awaitSuccess();
}

@Test
public void testJsonMessageFormat(TestContext should) throws Exception {

super.testJsonMessageFormat(should, "application/grpc+json");

JsonObject helloReply = new JsonObject().put("message", "Hello Julien");
JsonObject helloRequest = new JsonObject().put("name", "Julien");

Async done = should.async();

ServiceMethod<JsonObject, JsonObject> serviceMethod = ServiceMethod.client(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.JSON_OBJECT,
GrpcMessageDecoder.JSON_OBJECT);

client = GrpcClient.client(vertx);
client
.request(SocketAddress.inetSocketAddress(port, "localhost"), serviceMethod)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.end(helloRequest);
callRequest
.response()
.onComplete(should.asyncAssertSuccess(resp -> {
should.assertEquals(WireFormat.JSON, resp.format());
resp.handler(msg -> {
should.assertEquals(helloReply, msg);
});
resp.endHandler(v -> {
done.complete();
});
}));
}));

done.awaitSuccess();
}
}
Loading
Loading