From 1b4cd63a88fe81d60141d04d38b34885ce207938 Mon Sep 17 00:00:00 2001 From: Bill Burke Date: Tue, 1 Mar 2022 11:54:28 -0500 Subject: [PATCH] Fix multipart and Expect: 100-continue fix multi --- .../lambda/runtime/MockHttpEventServer.java | 6 + .../lambda/runtime/MockRestEventServer.java | 7 + .../lambda/runtime/MockBodyHandler.java | 200 ++++++++++++++++++ .../lambda/runtime/MockEventServer.java | 15 +- 4 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockBodyHandler.java diff --git a/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java b/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java index 0c2ff7300baa4..25b9106e7f757 100644 --- a/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java +++ b/extensions/amazon-lambda-http/http-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockHttpEventServer.java @@ -59,6 +59,12 @@ public void handleHttpRequests(RoutingContext ctx) { event.setRawPath(ctx.request().path()); event.setRawQueryString(ctx.request().query()); for (String header : ctx.request().headers().names()) { + if (header.equalsIgnoreCase("Expect")) { + String expect = ctx.request().getHeader("Expect"); + if (expect != null && expect.equalsIgnoreCase(CONTINUE)) { + continue; + } + } if (event.getHeaders() == null) event.setHeaders(new HashMap<>()); List values = ctx.request().headers().getAll(header); diff --git a/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java b/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java index 4648109b8ff7e..b19ed5d533b9a 100644 --- a/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java +++ b/extensions/amazon-lambda-rest/rest-event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockRestEventServer.java @@ -26,6 +26,7 @@ import io.vertx.ext.web.RoutingContext; public class MockRestEventServer extends MockEventServer { + public static final String CONTINUE = "100-continue"; private final ObjectWriter eventWriter; private final ObjectReader responseReader; @@ -84,6 +85,12 @@ public void handleHttpRequests(RoutingContext ctx) { if (ctx.request().headers() != null) { event.setMultiValueHeaders(new Headers()); for (String header : ctx.request().headers().names()) { + if (header.equalsIgnoreCase("Expect")) { + String expect = ctx.request().getHeader("Expect"); + if (expect != null && expect.equalsIgnoreCase(CONTINUE)) { + continue; + } + } List values = ctx.request().headers().getAll(header); for (String val : values) event.getMultiValueHeaders().add(header, val); diff --git a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockBodyHandler.java b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockBodyHandler.java new file mode 100644 index 0000000000000..09f521f189da4 --- /dev/null +++ b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockBodyHandler.java @@ -0,0 +1,200 @@ +package io.quarkus.amazon.lambda.runtime; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.handler.codec.DecoderException; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; +import io.vertx.ext.web.impl.RoutingContextInternal; + +/** + * Copy of Vertx BodyHandlerImpl. Had to do this because I want to get raw bytes of everything + * and if it was a form or multipart it would not set the body buffer. + */ +public class MockBodyHandler implements BodyHandler { + + private static final Logger LOG = LoggerFactory.getLogger(io.vertx.ext.web.handler.impl.BodyHandlerImpl.class); + + private long bodyLimit = DEFAULT_BODY_LIMIT; + private String uploadsDir; + private boolean mergeFormAttributes = DEFAULT_MERGE_FORM_ATTRIBUTES; + private boolean isPreallocateBodyBuffer = DEFAULT_PREALLOCATE_BODY_BUFFER; + private static final int DEFAULT_INITIAL_BODY_BUFFER_SIZE = 1024; //bytes + + public MockBodyHandler() { + } + + @Override + public void handle(RoutingContext context) { + HttpServerRequest request = context.request(); + if (request.headers().contains(HttpHeaders.UPGRADE, HttpHeaders.WEBSOCKET, true)) { + context.next(); + return; + } + // we need to keep state since we can be called again on reroute + if (!((RoutingContextInternal) context).seenHandler(RoutingContextInternal.BODY_HANDLER)) { + long contentLength = isPreallocateBodyBuffer ? parseContentLengthHeader(request) : -1; + BHandler handler = new BHandler(context, contentLength); + request.handler(handler); + request.endHandler(v -> handler.end()); + ((RoutingContextInternal) context).visitHandler(RoutingContextInternal.BODY_HANDLER); + } else { + // on reroute we need to re-merge the form params if that was desired + if (mergeFormAttributes && request.isExpectMultipart()) { + request.params().addAll(request.formAttributes()); + } + + context.next(); + } + } + + @Override + public BodyHandler setHandleFileUploads(boolean handleFileUploads) { + throw new IllegalStateException("Not Allowed"); + } + + @Override + public BodyHandler setBodyLimit(long bodyLimit) { + this.bodyLimit = bodyLimit; + return this; + } + + @Override + public BodyHandler setUploadsDirectory(String uploadsDirectory) { + this.uploadsDir = uploadsDirectory; + return this; + } + + @Override + public BodyHandler setMergeFormAttributes(boolean mergeFormAttributes) { + this.mergeFormAttributes = mergeFormAttributes; + return this; + } + + @Override + public BodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd) { + return this; + } + + @Override + public BodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer) { + this.isPreallocateBodyBuffer = isPreallocateBodyBuffer; + return this; + } + + private long parseContentLengthHeader(HttpServerRequest request) { + String contentLength = request.getHeader(HttpHeaders.CONTENT_LENGTH); + if (contentLength == null || contentLength.isEmpty()) { + return -1; + } + try { + long parsedContentLength = Long.parseLong(contentLength); + return parsedContentLength < 0 ? -1 : parsedContentLength; + } catch (NumberFormatException ex) { + return -1; + } + } + + private class BHandler implements Handler { + private static final int MAX_PREALLOCATED_BODY_BUFFER_BYTES = 65535; + + final RoutingContext context; + final long contentLength; + Buffer body; + boolean failed; + AtomicInteger uploadCount = new AtomicInteger(); + AtomicBoolean cleanup = new AtomicBoolean(false); + boolean ended; + long uploadSize = 0L; + + public BHandler(RoutingContext context, long contentLength) { + this.context = context; + this.contentLength = contentLength; + // the request clearly states that there should + // be a body, so we respect the client and ensure + // that the body will not be null + if (contentLength != -1) { + initBodyBuffer(); + } + + context.request().exceptionHandler(t -> { + if (t instanceof DecoderException) { + // bad request + context.fail(400, t.getCause()); + } else { + context.fail(t); + } + }); + } + + private void initBodyBuffer() { + int initialBodyBufferSize; + if (contentLength < 0) { + initialBodyBufferSize = DEFAULT_INITIAL_BODY_BUFFER_SIZE; + } else if (contentLength > MAX_PREALLOCATED_BODY_BUFFER_BYTES) { + initialBodyBufferSize = MAX_PREALLOCATED_BODY_BUFFER_BYTES; + } else { + initialBodyBufferSize = (int) contentLength; + } + + if (bodyLimit != -1) { + initialBodyBufferSize = (int) Math.min(initialBodyBufferSize, bodyLimit); + } + + this.body = Buffer.buffer(initialBodyBufferSize); + } + + @Override + public void handle(Buffer buff) { + if (failed) { + return; + } + uploadSize += buff.length(); + if (bodyLimit != -1 && uploadSize > bodyLimit) { + failed = true; + context.fail(413); + } else { + if (body == null) { + initBodyBuffer(); + } + body.appendBuffer(buff); + } + } + + void end() { + // this marks the end of body parsing, calling doEnd should + // only be possible from this moment onwards + ended = true; + + // only if parsing is done and count is 0 then all files have been processed + if (uploadCount.get() == 0) { + doEnd(); + } + } + + void doEnd() { + + if (failed) { + return; + } + + HttpServerRequest req = context.request(); + if (mergeFormAttributes && req.isExpectMultipart()) { + req.params().addAll(req.formAttributes()); + } + context.setBody(body); + // release body as it may take lots of memory + body = null; + + context.next(); + } + } + +} diff --git a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java index a6e86470d8f75..5429ea1d805e8 100644 --- a/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java +++ b/extensions/amazon-lambda/event-server/src/main/java/io/quarkus/amazon/lambda/runtime/MockEventServer.java @@ -14,6 +14,7 @@ import org.jboss.logging.Logger; +import io.netty.handler.codec.http.HttpHeaderNames; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.Vertx; @@ -23,7 +24,6 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.handler.BodyHandler; public class MockEventServer implements Closeable { protected static final Logger log = Logger.getLogger(MockEventServer.class); @@ -39,6 +39,7 @@ public class MockEventServer implements Closeable { public static final String INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION; public static final String NEXT_INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION_NEXT; public static final String POST_EVENT = BASE_PATH; + public static final String CONTINUE = "100-continue"; final AtomicBoolean closed = new AtomicBoolean(); public MockEventServer() { @@ -69,7 +70,17 @@ public int getPort() { } public void setupRoutes() { - router.route().handler(BodyHandler.create()); + router.route().handler((context) -> { + if (context.get("continue-sent") == null) { + String expect = context.request().getHeader(HttpHeaderNames.EXPECT); + if (expect != null && expect.equalsIgnoreCase(CONTINUE)) { + context.put("continue-sent", true); + context.response().writeContinue(); + } + } + context.next(); + }); + router.route().handler(new MockBodyHandler()); router.post(POST_EVENT).handler(this::postEvent); router.route(NEXT_INVOCATION).blockingHandler(this::nextEvent); router.route(INVOCATION + ":requestId" + AmazonLambdaApi.API_PATH_REQUEUE).handler(this::handleRequeue);