Skip to content

Commit

Permalink
Merge pull request #24039 from patriot1burke/022222
Browse files Browse the repository at this point in the history
Fix multipart and Expect: 100-continue
  • Loading branch information
gsmet authored Mar 2, 2022
2 parents 0e36974 + 1b4cd63 commit ee50eed
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public void handleHttpRequests(RoutingContext ctx) {
event.setRawPath(ctx.request().path());
event.setRawQueryString(ctx.request().query());
for (String header : ctx.request().headers().names()) {
if (header.equalsIgnoreCase("Expect")) {
String expect = ctx.request().getHeader("Expect");
if (expect != null && expect.equalsIgnoreCase(CONTINUE)) {
continue;
}
}
if (event.getHeaders() == null)
event.setHeaders(new HashMap<>());
List<String> values = ctx.request().headers().getAll(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.ext.web.RoutingContext;

public class MockRestEventServer extends MockEventServer {
public static final String CONTINUE = "100-continue";

private final ObjectWriter eventWriter;
private final ObjectReader responseReader;
Expand Down Expand Up @@ -84,6 +85,12 @@ public void handleHttpRequests(RoutingContext ctx) {
if (ctx.request().headers() != null) {
event.setMultiValueHeaders(new Headers());
for (String header : ctx.request().headers().names()) {
if (header.equalsIgnoreCase("Expect")) {
String expect = ctx.request().getHeader("Expect");
if (expect != null && expect.equalsIgnoreCase(CONTINUE)) {
continue;
}
}
List<String> values = ctx.request().headers().getAll(header);
for (String val : values)
event.getMultiValueHeaders().add(header, val);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package io.quarkus.amazon.lambda.runtime;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import io.netty.handler.codec.DecoderException;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.impl.RoutingContextInternal;

/**
* Copy of Vertx BodyHandlerImpl. Had to do this because I want to get raw bytes of everything
* and if it was a form or multipart it would not set the body buffer.
*/
public class MockBodyHandler implements BodyHandler {

private static final Logger LOG = LoggerFactory.getLogger(io.vertx.ext.web.handler.impl.BodyHandlerImpl.class);

private long bodyLimit = DEFAULT_BODY_LIMIT;
private String uploadsDir;
private boolean mergeFormAttributes = DEFAULT_MERGE_FORM_ATTRIBUTES;
private boolean isPreallocateBodyBuffer = DEFAULT_PREALLOCATE_BODY_BUFFER;
private static final int DEFAULT_INITIAL_BODY_BUFFER_SIZE = 1024; //bytes

public MockBodyHandler() {
}

@Override
public void handle(RoutingContext context) {
HttpServerRequest request = context.request();
if (request.headers().contains(HttpHeaders.UPGRADE, HttpHeaders.WEBSOCKET, true)) {
context.next();
return;
}
// we need to keep state since we can be called again on reroute
if (!((RoutingContextInternal) context).seenHandler(RoutingContextInternal.BODY_HANDLER)) {
long contentLength = isPreallocateBodyBuffer ? parseContentLengthHeader(request) : -1;
BHandler handler = new BHandler(context, contentLength);
request.handler(handler);
request.endHandler(v -> handler.end());
((RoutingContextInternal) context).visitHandler(RoutingContextInternal.BODY_HANDLER);
} else {
// on reroute we need to re-merge the form params if that was desired
if (mergeFormAttributes && request.isExpectMultipart()) {
request.params().addAll(request.formAttributes());
}

context.next();
}
}

@Override
public BodyHandler setHandleFileUploads(boolean handleFileUploads) {
throw new IllegalStateException("Not Allowed");
}

@Override
public BodyHandler setBodyLimit(long bodyLimit) {
this.bodyLimit = bodyLimit;
return this;
}

@Override
public BodyHandler setUploadsDirectory(String uploadsDirectory) {
this.uploadsDir = uploadsDirectory;
return this;
}

@Override
public BodyHandler setMergeFormAttributes(boolean mergeFormAttributes) {
this.mergeFormAttributes = mergeFormAttributes;
return this;
}

@Override
public BodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd) {
return this;
}

@Override
public BodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer) {
this.isPreallocateBodyBuffer = isPreallocateBodyBuffer;
return this;
}

private long parseContentLengthHeader(HttpServerRequest request) {
String contentLength = request.getHeader(HttpHeaders.CONTENT_LENGTH);
if (contentLength == null || contentLength.isEmpty()) {
return -1;
}
try {
long parsedContentLength = Long.parseLong(contentLength);
return parsedContentLength < 0 ? -1 : parsedContentLength;
} catch (NumberFormatException ex) {
return -1;
}
}

private class BHandler implements Handler<Buffer> {
private static final int MAX_PREALLOCATED_BODY_BUFFER_BYTES = 65535;

final RoutingContext context;
final long contentLength;
Buffer body;
boolean failed;
AtomicInteger uploadCount = new AtomicInteger();
AtomicBoolean cleanup = new AtomicBoolean(false);
boolean ended;
long uploadSize = 0L;

public BHandler(RoutingContext context, long contentLength) {
this.context = context;
this.contentLength = contentLength;
// the request clearly states that there should
// be a body, so we respect the client and ensure
// that the body will not be null
if (contentLength != -1) {
initBodyBuffer();
}

context.request().exceptionHandler(t -> {
if (t instanceof DecoderException) {
// bad request
context.fail(400, t.getCause());
} else {
context.fail(t);
}
});
}

private void initBodyBuffer() {
int initialBodyBufferSize;
if (contentLength < 0) {
initialBodyBufferSize = DEFAULT_INITIAL_BODY_BUFFER_SIZE;
} else if (contentLength > MAX_PREALLOCATED_BODY_BUFFER_BYTES) {
initialBodyBufferSize = MAX_PREALLOCATED_BODY_BUFFER_BYTES;
} else {
initialBodyBufferSize = (int) contentLength;
}

if (bodyLimit != -1) {
initialBodyBufferSize = (int) Math.min(initialBodyBufferSize, bodyLimit);
}

this.body = Buffer.buffer(initialBodyBufferSize);
}

@Override
public void handle(Buffer buff) {
if (failed) {
return;
}
uploadSize += buff.length();
if (bodyLimit != -1 && uploadSize > bodyLimit) {
failed = true;
context.fail(413);
} else {
if (body == null) {
initBodyBuffer();
}
body.appendBuffer(buff);
}
}

void end() {
// this marks the end of body parsing, calling doEnd should
// only be possible from this moment onwards
ended = true;

// only if parsing is done and count is 0 then all files have been processed
if (uploadCount.get() == 0) {
doEnd();
}
}

void doEnd() {

if (failed) {
return;
}

HttpServerRequest req = context.request();
if (mergeFormAttributes && req.isExpectMultipart()) {
req.params().addAll(req.formAttributes());
}
context.setBody(body);
// release body as it may take lots of memory
body = null;

context.next();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import org.jboss.logging.Logger;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -23,7 +24,6 @@
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;

public class MockEventServer implements Closeable {
protected static final Logger log = Logger.getLogger(MockEventServer.class);
Expand All @@ -39,6 +39,7 @@ public class MockEventServer implements Closeable {
public static final String INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION;
public static final String NEXT_INVOCATION = BASE_PATH + AmazonLambdaApi.API_PATH_INVOCATION_NEXT;
public static final String POST_EVENT = BASE_PATH;
public static final String CONTINUE = "100-continue";
final AtomicBoolean closed = new AtomicBoolean();

public MockEventServer() {
Expand Down Expand Up @@ -69,7 +70,17 @@ public int getPort() {
}

public void setupRoutes() {
router.route().handler(BodyHandler.create());
router.route().handler((context) -> {
if (context.get("continue-sent") == null) {
String expect = context.request().getHeader(HttpHeaderNames.EXPECT);
if (expect != null && expect.equalsIgnoreCase(CONTINUE)) {
context.put("continue-sent", true);
context.response().writeContinue();
}
}
context.next();
});
router.route().handler(new MockBodyHandler());
router.post(POST_EVENT).handler(this::postEvent);
router.route(NEXT_INVOCATION).blockingHandler(this::nextEvent);
router.route(INVOCATION + ":requestId" + AmazonLambdaApi.API_PATH_REQUEUE).handler(this::handleRequeue);
Expand Down

0 comments on commit ee50eed

Please sign in to comment.