Skip to content

Commit

Permalink
Support FileRegions
Browse files Browse the repository at this point in the history
lambda formatting

virtual handler

poll loop error messages

git rid of poll loop

fix transferTo

revert back
  • Loading branch information
patriot1burke committed Jun 3, 2020
1 parent 0e2a78b commit d91e04c
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.net.URLEncoder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.jboss.logging.Logger;

import com.amazonaws.serverless.proxy.internal.LambdaContainerHandler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -28,10 +33,12 @@
import io.quarkus.amazon.lambda.http.model.AwsProxyResponse;
import io.quarkus.amazon.lambda.http.model.Headers;
import io.quarkus.netty.runtime.virtual.VirtualClientConnection;
import io.quarkus.netty.runtime.virtual.VirtualResponseHandler;
import io.quarkus.vertx.http.runtime.VertxHttpRecorder;

@SuppressWarnings("unused")
public class LambdaHttpHandler implements RequestHandler<AwsProxyRequest, AwsProxyResponse> {
private static final Logger log = Logger.getLogger("quarkus.amazon.lambda.http");

private static Headers errorHeaders = new Headers();
static {
Expand All @@ -46,19 +53,99 @@ public AwsProxyResponse handleRequest(AwsProxyRequest request, Context context)
}
}

VirtualClientConnection connection = VirtualClientConnection.connect(VertxHttpRecorder.VIRTUAL_HTTP, clientAddress);
try {
return nettyDispatch(connection, request);
return nettyDispatch(clientAddress, request);
} catch (Exception e) {
log.error("Request Failure", e);
return new AwsProxyResponse(500, errorHeaders, "{ \"message\": \"Internal Server Error\" }");
} finally {
connection.close();
}

}

private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsProxyRequest request) throws Exception {
private class NettyResponseHandler implements VirtualResponseHandler {
AwsProxyResponse responseBuilder = new AwsProxyResponse();
ByteArrayOutputStream baos;
WritableByteChannel byteChannel;
final AwsProxyRequest request;
CompletableFuture<AwsProxyResponse> future = new CompletableFuture<>();

public NettyResponseHandler(AwsProxyRequest request) {
this.request = request;
}

public CompletableFuture<AwsProxyResponse> getFuture() {
return future;
}

@Override
public void handleMessage(Object msg) {
try {
//log.info("Got message: " + msg.getClass().getName());

if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse) msg;
responseBuilder.setStatusCode(res.status().code());

if (request.getRequestSource() == AwsProxyRequest.RequestSource.ALB) {
responseBuilder.setStatusDescription(res.status().reasonPhrase());
}
responseBuilder.setMultiValueHeaders(new Headers());
for (String name : res.headers().names()) {
for (String v : res.headers().getAll(name)) {
responseBuilder.getMultiValueHeaders().add(name, v);
}
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
int readable = content.content().readableBytes();
if (baos == null && readable > 0) {
baos = createByteStream();
}
for (int i = 0; i < readable; i++) {
baos.write(content.content().readByte());
}
}
if (msg instanceof FileRegion) {
FileRegion file = (FileRegion) msg;
if (file.count() > 0 && file.transferred() < file.count()) {
if (baos == null)
baos = createByteStream();
if (byteChannel == null)
byteChannel = Channels.newChannel(baos);
file.transferTo(byteChannel, file.transferred());
}
}
if (msg instanceof LastHttpContent) {
if (baos != null) {
if (isBinary(responseBuilder.getMultiValueHeaders().getFirst("Content-Type"))) {
responseBuilder.setBase64Encoded(true);
responseBuilder.setBody(Base64.getMimeEncoder().encodeToString(baos.toByteArray()));
} else {
responseBuilder.setBody(new String(baos.toByteArray(), "UTF-8"));
}
}
future.complete(responseBuilder);
}
} catch (Throwable ex) {
future.completeExceptionally(ex);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}

@Override
public void close() {
if (!future.isDone())
future.completeExceptionally(new RuntimeException("Connection closed"));
}
}

private AwsProxyResponse nettyDispatch(InetSocketAddress clientAddress, AwsProxyRequest request) throws Exception {
String path = request.getPath();
//log.info("---- Got lambda request: " + path);
if (request.getMultiValueQueryStringParameters() != null && !request.getMultiValueQueryStringParameters().isEmpty()) {
StringBuilder sb = new StringBuilder(path);
sb.append("?");
Expand Down Expand Up @@ -104,69 +191,25 @@ private AwsProxyResponse nettyDispatch(VirtualClientConnection connection, AwsPr
requestContent = new DefaultLastHttpContent(body);
}
}
NettyResponseHandler handler = new NettyResponseHandler(request);
VirtualClientConnection connection = VirtualClientConnection.connect(handler, VertxHttpRecorder.VIRTUAL_HTTP,
clientAddress);

connection.sendMessage(nettyRequest);
connection.sendMessage(requestContent);
AwsProxyResponse responseBuilder = new AwsProxyResponse();
ByteArrayOutputStream baos = null;
try {
for (;;) {
// todo should we timeout? have a timeout config?
//log.info("waiting for message");
Object msg = connection.queue().poll(100, TimeUnit.MILLISECONDS);
try {
if (msg == null)
continue;
//log.info("Got message: " + msg.getClass().getName());

if (msg instanceof HttpResponse) {
HttpResponse res = (HttpResponse) msg;
responseBuilder.setStatusCode(res.status().code());

if (request.getRequestSource() == AwsProxyRequest.RequestSource.ALB) {
responseBuilder.setStatusDescription(res.status().reasonPhrase());
}
responseBuilder.setMultiValueHeaders(new Headers());
for (String name : res.headers().names()) {
for (String v : res.headers().getAll(name)) {
responseBuilder.getMultiValueHeaders().add(name, v);
}
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
int readable = content.content().readableBytes();
if (baos == null && readable > 0) {
// todo what is right size?
baos = new ByteArrayOutputStream(500);
}
for (int i = 0; i < readable; i++) {
baos.write(content.content().readByte());
}
}
if (msg instanceof LastHttpContent) {
if (baos != null) {
if (isBinary(responseBuilder.getMultiValueHeaders().getFirst("Content-Type"))) {
responseBuilder.setBase64Encoded(true);
responseBuilder.setBody(Base64.getMimeEncoder().encodeToString(baos.toByteArray()));
} else {
responseBuilder.setBody(new String(baos.toByteArray(), "UTF-8"));
}
}
return responseBuilder;
}
} finally {
if (msg != null)
ReferenceCountUtil.release(msg);
}
}
return handler.getFuture().get();
} finally {
if (baos != null) {
baos.close();
}
connection.close();
}
}

private ByteArrayOutputStream createByteStream() {
ByteArrayOutputStream baos;// todo what is right size?
baos = new ByteArrayOutputStream(1000);
return baos;
}

private boolean isBinary(String contentType) {
if (contentType != null) {
int index = contentType.indexOf(';');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -45,7 +47,15 @@ public void run() {
URL requestUrl = AmazonLambdaApi.invocationNext();
while (running.get()) {

HttpURLConnection requestConnection = (HttpURLConnection) requestUrl.openConnection();
HttpURLConnection requestConnection = null;
try {
requestConnection = (HttpURLConnection) requestUrl.openConnection();
} catch (IOException e) {
if (abortGracefully(e)) {
return;
}
throw e;
}
try {
String requestId = requestConnection.getHeaderField(AmazonLambdaApi.LAMBDA_RUNTIME_AWS_REQUEST_ID);
try {
Expand All @@ -70,6 +80,9 @@ public void run() {
}
}
} catch (Exception e) {
if (abortGracefully(e)) {
return;
}
log.error("Failed to run lambda", e);

postError(AmazonLambdaApi.invocationError(requestId),
Expand All @@ -78,7 +91,8 @@ public void run() {
}

} catch (Exception e) {
log.error("Error running lambda", e);
if (!abortGracefully(e))
log.error("Error running lambda", e);
Application app = Application.currentApplication();
if (app != null) {
app.stop();
Expand Down Expand Up @@ -174,4 +188,14 @@ protected HttpURLConnection responseStream(URL url) throws IOException {
return responseConnection;
}

boolean abortGracefully(Exception ex) {
// if we are running in test mode, then don't output stack trace for socket errors

boolean graceful = (ex instanceof SocketException || ex instanceof ConnectException)
&& System.getProperty(AmazonLambdaApi.QUARKUS_INTERNAL_AWS_LAMBDA_TEST_API) != null;
if (graceful)
log.warn("Aborting lambda poll loop");
return graceful;
}

}
Loading

0 comments on commit d91e04c

Please sign in to comment.