Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support FileRegions for AWS Lambda HTTP and Azure Functions HTTP #9714

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.net.URLEncoder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.jboss.logging.Logger;

import com.amazonaws.serverless.proxy.internal.LambdaContainerHandler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -28,10 +33,12 @@
import io.quarkus.amazon.lambda.http.model.AwsProxyResponse;
import io.quarkus.amazon.lambda.http.model.Headers;
import io.quarkus.netty.runtime.virtual.VirtualClientConnection;
import io.quarkus.netty.runtime.virtual.VirtualMessage;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;

@SuppressWarnings("unused")
public class LambdaHttpHandler implements RequestHandler<AwsProxyRequest, AwsProxyResponse> {
private static final Logger log = Logger.getLogger("quarkus.amazon.lambda.http");

private static Headers errorHeaders = new Headers();
static {
Expand All @@ -50,6 +57,7 @@ public AwsProxyResponse handleRequest(AwsProxyRequest request, Context context)
try {
return nettyDispatch(connection, request);
} catch (Exception e) {
log.error("Request Failure", e);
return new AwsProxyResponse(500, errorHeaders, "{ \"message\": \"Internal Server Error\" }");
} finally {
connection.close();
Expand All @@ -59,6 +67,7 @@ public AwsProxyResponse handleRequest(AwsProxyRequest request, Context context)

private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsProxyRequest request) throws Exception {
String path = request.getPath();
//log.info("---- Got lambda request: " + path);
if (request.getMultiValueQueryStringParameters() != null && !request.getMultiValueQueryStringParameters().isEmpty()) {
StringBuilder sb = new StringBuilder(path);
sb.append("?");
Expand Down Expand Up @@ -109,14 +118,14 @@ private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsPr
connection.sendMessage(requestContent);
AwsProxyResponse responseBuilder = new AwsProxyResponse();
ByteArrayOutputStream baos = null;
WritableByteChannel byteChannel = null;
try {
for (;;) {
// todo should we timeout? have a timeout config?
//log.info("waiting for message");
Object msg = connection.queue().poll(100, TimeUnit.MILLISECONDS);
VirtualMessage virtualMessage = connection.queue().poll(100, TimeUnit.MILLISECONDS);
if (virtualMessage == null) continue;
Object msg = virtualMessage.getMessage();
try {
if (msg == null)
continue;
//log.info("Got message: " + msg.getClass().getName());

if (msg instanceof HttpResponse) {
Expand All @@ -137,13 +146,22 @@ private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsPr
HttpContent content = (HttpContent) msg;
int readable = content.content().readableBytes();
if (baos == null && readable > 0) {
// todo what is right size?
baos = new ByteArrayOutputStream(500);
baos = createByteStream();
}
for (int i = 0; i < readable; i++) {
baos.write(content.content().readByte());
}
}
if (msg instanceof FileRegion) {
FileRegion file = (FileRegion) msg;
if (file.count() > 0) {
if (baos == null)
baos = createByteStream();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will truncate files if the length is larger than the 1000 byte buffer.

if (byteChannel == null)
byteChannel = Channels.newChannel(baos);
file.transferTo(byteChannel, 0);
}
}
if (msg instanceof LastHttpContent) {
if (baos != null) {
if (isBinary(responseBuilder.getMultiValueHeaders().getFirst("Content-Type"))) {
Expand All @@ -156,8 +174,10 @@ private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsPr
return responseBuilder;
}
} finally {
if (msg != null)
if (msg != null) {
virtualMessage.completed();
ReferenceCountUtil.release(msg);
}
}
}
} finally {
Expand All @@ -167,6 +187,12 @@ private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsPr
}
}

private ByteArrayOutputStream createByteStream() {
ByteArrayOutputStream baos;// todo what is right size?
baos = new ByteArrayOutputStream(1000);
return baos;
}

private boolean isBinary(String contentType) {
if (contentType != null) {
int index = contentType.indexOf(';');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand All @@ -15,6 +17,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -24,6 +27,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.quarkus.netty.runtime.virtual.VirtualClientConnection;
import io.quarkus.netty.runtime.virtual.VirtualMessage;
import io.quarkus.runtime.Application;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;

Expand Down Expand Up @@ -96,14 +100,15 @@ protected HttpResponseMessage nettyDispatch(VirtualClientConnection connection,
connection.sendMessage(requestContent);
HttpResponseMessage.Builder responseBuilder = null;
ByteArrayOutputStream baos = null;
WritableByteChannel byteChannel = null;
try {
for (;;) {
// todo should we timeout? have a timeout config?
//log.info("waiting for message");
Object msg = connection.queue().poll(100, TimeUnit.MILLISECONDS);
VirtualMessage virtualMessage = connection.queue().poll(100, TimeUnit.MILLISECONDS);
if (virtualMessage == null) continue;
Object msg = virtualMessage.getMessage();
try {
if (msg == null)
continue;
//log.info("Got message: " + msg.getClass().getName());

if (msg instanceof HttpResponse) {
Expand All @@ -117,20 +122,32 @@ protected HttpResponseMessage nettyDispatch(VirtualClientConnection connection,
HttpContent content = (HttpContent) msg;
if (baos == null) {
// todo what is right size?
baos = new ByteArrayOutputStream(500);
baos = createByteStream();
}
int readable = content.content().readableBytes();
for (int i = 0; i < readable; i++) {
baos.write(content.content().readByte());
}
}
if (msg instanceof FileRegion) {
FileRegion file = (FileRegion) msg;
if (file.count() > 0) {
if (baos == null)
baos = createByteStream();
if (byteChannel == null)
byteChannel = Channels.newChannel(baos);
file.transferTo(byteChannel, 0);
}
}
if (msg instanceof LastHttpContent) {
responseBuilder.body(baos.toByteArray());
return responseBuilder.build();
}
} finally {
if (msg != null)
if (msg != null) {
virtualMessage.completed();
ReferenceCountUtil.release(msg);
}
}
}
} finally {
Expand All @@ -139,4 +156,10 @@ protected HttpResponseMessage nettyDispatch(VirtualClientConnection connection,
}
}
}

private ByteArrayOutputStream createByteStream() {
ByteArrayOutputStream baos;
baos = new ByteArrayOutputStream(500);
return baos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,15 @@ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// It is possible the peer could have closed while we are writing, and in this case we should
// simulate real socket behavior and ensure the sendMessage operation is failed.
if (peer.isConnected()) {
peer.queue().add(ReferenceCountUtil.retain(msg));
VirtualMessage virtualMessage = new VirtualMessage(msg);
ReferenceCountUtil.retain(msg);
peer.queue().add(virtualMessage);
// need to wait until client is finished with message
// Things like FileRegion get closed when they are removed from outbound buffer
// It sucks we have to synchronize the threads here with every message,
// but the buffer class isn't flexible enough to handle this scenario.
// Might not be a big deal. :)
virtualMessage.awaitComplete();
in.remove();
} else {
if (exception == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
public class VirtualClientConnection {
protected SocketAddress clientAddress;
protected BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
protected BlockingQueue<VirtualMessage> queue = new LinkedBlockingQueue<>();
protected boolean connected = true;
protected VirtualChannel peer;

Expand All @@ -33,7 +33,7 @@ public SocketAddress clientAddress() {
*
* @return
*/
public BlockingQueue<Object> queue() {
public BlockingQueue<VirtualMessage> queue() {
return queue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.netty.runtime.virtual;

import java.util.concurrent.CompletableFuture;

public class VirtualMessage {
private Object message;
private CompletableFuture<Void> future = new CompletableFuture<>();

public VirtualMessage(Object message) {
this.message = message;
}

public Object getMessage() {
return message;
}

public void completed() {
future.complete(null);
}

public void awaitComplete() throws Exception {
future.get();
}
}
4 changes: 4 additions & 0 deletions integration-tests/amazon-lambda-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-undertow</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,40 @@ public void testGetText() throws Exception {
testGetText("/hello");
}

@Test
public void testSwaggerUi() throws Exception {
// this tests the FileRegion support in the handler
AwsProxyRequest request = request("/swagger-ui/");
AwsProxyResponse out = LambdaClient.invoke(AwsProxyResponse.class, request);
Assertions.assertEquals(out.getStatusCode(), 200);
Assertions.assertTrue(body(out).contains("Swagger UI"));

}

private String body(AwsProxyResponse response) {
if (!response.isBase64Encoded())
return response.getBody();
return new String(Base64.decodeBase64(response.getBody()));
}

private void testGetText(String path) {
AwsProxyRequest request = new AwsProxyRequest();
request.setHttpMethod("GET");
request.setPath(path);
AwsProxyRequest request = request(path);
AwsProxyResponse out = LambdaClient.invoke(AwsProxyResponse.class, request);
Assertions.assertEquals(out.getStatusCode(), 200);
Assertions.assertEquals(body(out), "hello");
Assertions.assertTrue(out.getMultiValueHeaders().getFirst("Content-Type").startsWith("text/plain"));
}

@Test
public void test404() throws Exception {
private AwsProxyRequest request(String path) {
AwsProxyRequest request = new AwsProxyRequest();
request.setHttpMethod("GET");
request.setPath("/nowhere");
request.setPath(path);
return request;
}

@Test
public void test404() throws Exception {
AwsProxyRequest request = request("/nowhere");
AwsProxyResponse out = LambdaClient.invoke(AwsProxyResponse.class, request);
Assertions.assertEquals(out.getStatusCode(), 404);
}
Expand Down
4 changes: 4 additions & 0 deletions integration-tests/virtual-http-resteasy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-azure-functions-http</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.azure.functions</groupId>
<artifactId>azure-functions-java-library</artifactId>
Expand Down
Loading