Skip to content

Commit

Permalink
Support FileRegions
Browse files Browse the repository at this point in the history
lambda formatting

virtual handler
  • Loading branch information
patriot1burke committed Jun 1, 2020
1 parent 0634430 commit 4b013c8
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.net.URLEncoder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.jboss.logging.Logger;

import com.amazonaws.serverless.proxy.internal.LambdaContainerHandler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -28,10 +33,12 @@
import io.quarkus.amazon.lambda.http.model.AwsProxyResponse;
import io.quarkus.amazon.lambda.http.model.Headers;
import io.quarkus.netty.runtime.virtual.VirtualClientConnection;
import io.quarkus.netty.runtime.virtual.VirtualResponseHandler;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;

@SuppressWarnings("unused")
public class LambdaHttpHandler implements RequestHandler<AwsProxyRequest, AwsProxyResponse> {
private static final Logger log = Logger.getLogger("quarkus.amazon.lambda.http");

private static Headers errorHeaders = new Headers();
static {
Expand All @@ -46,19 +53,102 @@ public AwsProxyResponse handleRequest(AwsProxyRequest request, Context context)
}
}

VirtualClientConnection connection = VirtualClientConnection.connect(VertxHttpRecorder.VIRTUAL_HTTP, clientAddress);
try {
return nettyDispatch(connection, request);
return nettyDispatch(clientAddress, request);
} catch (Exception e) {
log.error("Request Failure", e);
return new AwsProxyResponse(500, errorHeaders, "{ \"message\": \"Internal Server Error\" }");
} finally {
connection.close();
}

}

private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsProxyRequest request) throws Exception {
private class NettyResponseHandler implements VirtualResponseHandler {
AwsProxyResponse responseBuilder = new AwsProxyResponse();
ByteArrayOutputStream baos;
WritableByteChannel byteChannel;
final AwsProxyRequest request;
CompletableFuture<AwsProxyResponse> future = new CompletableFuture<>();

public NettyResponseHandler(AwsProxyRequest request) {
this.request = request;
}

public CompletableFuture<AwsProxyResponse> getFuture() {
return future;
}

@Override
public void handleMessage(Object msg) {
try {
//log.info("Got message: " + msg.getClass().getName());

if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse) msg;
responseBuilder.setStatusCode(res.status().code());

if (request.getRequestSource() == AwsProxyRequest.RequestSource.ALB) {
responseBuilder.setStatusDescription(res.status().reasonPhrase());
}
responseBuilder.setMultiValueHeaders(new Headers());
for (String name : res.headers().names()) {
for (String v : res.headers().getAll(name)) {
responseBuilder.getMultiValueHeaders().add(name, v);
}
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
int readable = content.content().readableBytes();
if (baos == null && readable > 0) {
baos = createByteStream();
}
for (int i = 0; i < readable; i++) {
baos.write(content.content().readByte());
}
}
if (msg instanceof FileRegion) {
FileRegion file = (FileRegion) msg;
if (file.count() > 0 && file.transferred() < file.count()) {
if (baos == null)
baos = createByteStream();
if (byteChannel == null)
byteChannel = Channels.newChannel(baos);
// AbstractNioByteChannel also seems to do a spin, albeit with a count.
while (file.transferred() < file.count()) {
file.transferTo(byteChannel, file.transferred());
}
}
}
if (msg instanceof LastHttpContent) {
if (baos != null) {
if (isBinary(responseBuilder.getMultiValueHeaders().getFirst("Content-Type"))) {
responseBuilder.setBase64Encoded(true);
responseBuilder.setBody(Base64.getMimeEncoder().encodeToString(baos.toByteArray()));
} else {
responseBuilder.setBody(new String(baos.toByteArray(), "UTF-8"));
}
}
future.complete(responseBuilder);
}
} catch (Throwable ex) {
future.completeExceptionally(ex);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}

@Override
public void close() {
if (!future.isDone())
future.completeExceptionally(new RuntimeException("Connection closed"));
}
}

private AwsProxyResponse nettyDispatch(InetSocketAddress clientAddress, AwsProxyRequest request) throws Exception {
String path = request.getPath();
//log.info("---- Got lambda request: " + path);
if (request.getMultiValueQueryStringParameters() != null && !request.getMultiValueQueryStringParameters().isEmpty()) {
StringBuilder sb = new StringBuilder(path);
sb.append("?");
Expand Down Expand Up @@ -104,69 +194,25 @@ private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsPr
requestContent = new DefaultLastHttpContent(body);
}
}
NettyResponseHandler handler = new NettyResponseHandler(request);
VirtualClientConnection connection = VirtualClientConnection.connect(handler, VertxHttpRecorder.VIRTUAL_HTTP,
clientAddress);

connection.sendMessage(nettyRequest);
connection.sendMessage(requestContent);
AwsProxyResponse responseBuilder = new AwsProxyResponse();
ByteArrayOutputStream baos = null;
try {
for (;;) {
// todo should we timeout? have a timeout config?
//log.info("waiting for message");
Object msg = connection.queue().poll(100, TimeUnit.MILLISECONDS);
try {
if (msg == null)
continue;
//log.info("Got message: " + msg.getClass().getName());

if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse) msg;
responseBuilder.setStatusCode(res.status().code());

if (request.getRequestSource() == AwsProxyRequest.RequestSource.ALB) {
responseBuilder.setStatusDescription(res.status().reasonPhrase());
}
responseBuilder.setMultiValueHeaders(new Headers());
for (String name : res.headers().names()) {
for (String v : res.headers().getAll(name)) {
responseBuilder.getMultiValueHeaders().add(name, v);
}
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
int readable = content.content().readableBytes();
if (baos == null && readable > 0) {
// todo what is right size?
baos = new ByteArrayOutputStream(500);
}
for (int i = 0; i < readable; i++) {
baos.write(content.content().readByte());
}
}
if (msg instanceof LastHttpContent) {
if (baos != null) {
if (isBinary(responseBuilder.getMultiValueHeaders().getFirst("Content-Type"))) {
responseBuilder.setBase64Encoded(true);
responseBuilder.setBody(Base64.getMimeEncoder().encodeToString(baos.toByteArray()));
} else {
responseBuilder.setBody(new String(baos.toByteArray(), "UTF-8"));
}
}
return responseBuilder;
}
} finally {
if (msg != null)
ReferenceCountUtil.release(msg);
}
}
return handler.getFuture().get();
} finally {
if (baos != null) {
baos.close();
}
connection.close();
}
}

private ByteArrayOutputStream createByteStream() {
ByteArrayOutputStream baos;// todo what is right size?
baos = new ByteArrayOutputStream(1000);
return baos;
}

private boolean isBinary(String contentType) {
if (contentType != null) {
int index = contentType.indexOf(';');
Expand Down
Loading

0 comments on commit 4b013c8

Please sign in to comment.